15 模仿学习
15.1 简介
虽然强化学习不需要有监督学习中的标签数据,但它十分依赖奖励函数的设置。有时在奖励函数上做一些微小的改动,训练出来的策略就会有天差地别。在很多现实场景中,奖励函数并未给定,或者奖励信号极其稀疏,此时随机设计奖励函数将无法保证强化学习训练出来的策略满足实际需要。例如,对于无人驾驶车辆智能体的规控,其观测是当前的环境感知恢复的 3D 局部环境,动作是车辆接下来数秒的具体路径规划,那么奖励是什么?如果只是规定正常行驶而不发生碰撞的奖励为+1,发生碰撞为-100,那么智能体学习的结果则很可能是找个地方停滞不前。具体能帮助无人驾驶小车规控的奖励函数往往需要专家的精心设计和调试。
假设存在一个专家智能体,其策略可以看成最优策略,我们就可以直接模仿这个专家在环境中交互的状态动作数据来训练一个策略,并且不需要用到环境提供的奖励信号。模仿学习(imitation learning)研究的便是这一类问题,在模仿学习的框架下,专家能够提供一系列状态动作对{ ( s t , a t ) } \{(s_t,a_t)\} {( s t , a t )} ,表示专家在环境s t s_t s t 下做出了的动作a t a_t a t ,而模仿者的任务则是利用这些专家数据进行训练,无须奖励信号就可以达到一个接近专家的策略。目前学术界模仿学习的方法基本上可以分为 3 类:
在本章将主要介绍行为克隆方法和生成式对抗模仿学习方法。尽管逆强化学习有良好的学术贡献,但由于其计算复杂度较高,实际应用的价值较小。
15.2 行为克隆
行为克隆(BC)就是直接使用监督学习方法,将专家数据中( s t , a t ) (s_t,a_t) ( s t , a t ) 的s t s_t s t 看作样本输入,a t a_t a t 视为标签,学习的目标为
θ ∗ = arg min θ E ( s , a ) ∼ B [ L ( π θ ( s ) ) , a ] \theta^{*} = \argmin_{\theta} \mathbb{E}_{(s,a)\sim B} \Big[\mathcal{L}(\pi_\theta(s)), a\Big]
θ ∗ = θ arg min E ( s , a ) ∼ B [ L ( π θ ( s )) , a ]
其中,B B B 是专家的数据集,L \mathcal{L} L 是对应监督学习框架下的损失函数。若动作是离散的,该损失函数可以是最大似然估计得到的。若动作是连续的,该损失函数可以是均方误差函数。
在训练数据量比较大的时候,BC 能够很快地学习到一个不错的策略。例如,围棋人工智能 AlphaGo 就是首先在 16 万盘棋局的 3000 万次落子数据中学习人类选手是如何下棋的,仅仅凭这个行为克隆方法,AlphaGo 的棋力就已经超过了很多业余围棋爱好者。由于 BC 的实现十分简单,因此在很多实际场景下它都可以作为策略预训练的方法。BC 能使得策略无须在较差时仍然低效地通过和环境交互来探索较好的动作,而是通过模仿专家智能体的行为数据来快速达到较高水平,为接下来的强化学习创造一个高起点。
BC 也存在很大的局限性,该局限在数据量比较小的时候犹为明显。具体来说,由于通过 BC 学习得到的策略只是拿小部分专家数据进行训练,因此 BC 只能在专家数据的状态分布下预测得比较准。然而,强化学习面对的是一个序贯决策问题,通过 BC 学习得到的策略在和环境交互过程中不可能完全学成最优,只要存在一点偏差,就有可能导致下一个遇到的状态是在专家数据中没有见过的。此时,由于没有在此状态(或者比较相近的状态)下训练过,策略可能就会随机选择一个动作,这会导致下一个状态进一步偏离专家策略遇到的的数据分布。最终,该策略在真实环境下不能得到比较好的效果,这被称为行为克隆的复合误差(compounding error)问题,如图 15-1 所示。
图15-1 行为克隆带来的复合误差问题
15.3 生成式对抗模仿学习
生成式对抗模仿学习 (generative adversarial imitation learning,GAIL)是 2016 年由斯坦福大学研究团队提出的基于生成式对抗网络的模仿学习,它诠释了生成式对抗网络的本质其实就是模仿学习。GAIL 实质上是模仿了专家策略的占用度量ρ E ( s , a ) \rho_E(s,a) ρ E ( s , a ) ,即尽量使得策略在环境中的所有状态动作对( s , a ) (s,a) ( s , a ) 的占用度量ρ π ( s , a ) \rho_\pi(s,a) ρ π ( s , a ) 和专家策略的占用度量ρ E ( s , a ) \rho_E(s,a) ρ E ( s , a ) 一致。为了达成这个目标,策略需要和环境进行交互,收集下一个状态的信息并进一步做出动作。这一点和 BC 不同,BC 完全不需要和环境交互。GAIL 算法中有一个判别器和一个策略,策略π \pi π 就相当于是生成式对抗网络中的生成器 (generator),给定一个状态,策略会输出这个状态下应该采取的动作,而判别器(discriminator)D D D 将状态动作对( s , a ) (s,a) ( s , a ) 作为输入,输出一个 0 0 0 到 1 1 1 之间的实数,表示判别器认为该状态动作对( s , a ) (s,a) ( s , a ) 是来自智能体策略而非专家的概率。判别器D D D 的目标是尽量将专家数据的输出靠近 0 0 0 ,将模仿者策略的输出靠近 1 1 1 ,这样就可以将两组数据分辨开来。于是,判别器D D D 的损失函数为
L ( ϕ ) = − E ρ π [ log D ϕ ( s , a ) ] − E ρ E [ log ( 1 − D ϕ ( s , a ) ) ] \mathcal{L}(\phi) = -\mathbb{E}_{\rho_\pi} [\log D_\phi(s,a)] - \mathbb{E}_{\rho_E}[\log(1 - D_\phi(s,a))]
L ( ϕ ) = − E ρ π [ log D ϕ ( s , a )] − E ρ E [ log ( 1 − D ϕ ( s , a ))]
其中ϕ \phi ϕ 是判别器D D D 的参数。有了判别器D D D 之后,模仿者策略的目标就是其交互产生的轨迹能被判别器误认为专家轨迹。于是,我们可以用判别器D D D 的输出来作为奖励函数来训练模仿者策略。具体来说,若模仿者策略在环境中采样到状态s s s ,并且采取动作a a a ,此时该状态动作对( s , a ) (s,a) ( s , a ) 会输入到判别器D D D 中,输出D ( s , a ) D(s,a) D ( s , a ) 的值,然后将奖励设置为r ( s , a ) = − log D ( s , a ) r(s,a)=-\log D(s,a) r ( s , a ) = − log D ( s , a ) 。于是,我们可以用任意强化学习算法,使用这些数据继续训练模仿者策略。最后,在对抗过程不断进行后,模仿者策略生成的数据分布将接近真实的专家数据分布,达到模仿学习的目标。GAIL 的优化目标如图 15-2 所示。
图15-2 GAIL 的优化目标
第 3 章介绍过一个策略和给定 MDP 交互的占用度量呈一一对应的关系。因此,模仿学习的本质就是通过更新策略使其占用度量尽量靠近专家的占用度量,而这正是 GAIL 的训练目标。由于一旦策略改变,其占用度量就会改变,因此为了训练好最新的判别器,策略需要不断和环境做交互,采样出最新的状态动作对样本。
15.4 代码实践
15.4.1 生成专家数据
首先,我们需要有一定量的专家数据,为此,预先通过 PPO 算法训练出一个表现良好的专家模型,再利用专家模型生成专家数据。本次代码实践的环境是 CartPole-v0,以下是 PPO 代码内容。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 import gymimport torchimport torch.nn.functional as Fimport torch.nn as nnimport numpy as npimport matplotlib.pyplot as pltfrom tqdm import tqdmimport randomimport rl_utilsclass PolicyNet (torch.nn.Module): def __init__ (self, state_dim, hidden_dim, action_dim ): super (PolicyNet, self).__init__() self.fc1 = torch.nn.Linear(state_dim, hidden_dim) self.fc2 = torch.nn.Linear(hidden_dim, action_dim) def forward (self, x ): x = F.relu(self.fc1(x)) return F.softmax(self.fc2(x), dim=1 )class ValueNet (torch.nn.Module): def __init__ (self, state_dim, hidden_dim ): super (ValueNet, self).__init__() self.fc1 = torch.nn.Linear(state_dim, hidden_dim) self.fc2 = torch.nn.Linear(hidden_dim, 1 ) def forward (self, x ): x = F.relu(self.fc1(x)) return self.fc2(x)class PPO : ''' PPO算法,采用截断方式 ''' def __init__ (self, state_dim, hidden_dim, action_dim, actor_lr, critic_lr, lmbda, epochs, eps, gamma, device ): self.actor = PolicyNet(state_dim, hidden_dim, action_dim).to(device) self.critic = ValueNet(state_dim, hidden_dim).to(device) self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr) self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=critic_lr) self.gamma = gamma self.lmbda = lmbda self.epochs = epochs self.eps = eps self.device = device def take_action (self, state ): state = torch.tensor([state], dtype=torch.float ).to(self.device) probs = self.actor(state) action_dist = torch.distributions.Categorical(probs) action = action_dist.sample() return action.item() def update (self, transition_dict ): states = torch.tensor(transition_dict['states' ], dtype=torch.float ).to(self.device) actions = torch.tensor(transition_dict['actions' ]).view(-1 , 1 ).to( self.device) rewards = torch.tensor(transition_dict['rewards' ], dtype=torch.float ).view(-1 , 1 ).to(self.device) next_states = torch.tensor(transition_dict['next_states' ], dtype=torch.float ).to(self.device) dones = torch.tensor(transition_dict['dones' ], dtype=torch.float ).view(-1 , 1 ).to(self.device) td_target = rewards + self.gamma * self.critic(next_states) * (1 - dones) td_delta = td_target - self.critic(states) advantage = rl_utils.compute_advantage(self.gamma, self.lmbda, td_delta.cpu()).to(self.device) old_log_probs = torch.log(self.actor(states).gather(1 , actions)).detach() for _ in range (self.epochs): log_probs = torch.log(self.actor(states).gather(1 , actions)) ratio = torch.exp(log_probs - old_log_probs) surr1 = ratio * advantage surr2 = torch.clamp(ratio, 1 - self.eps, 1 + self.eps) * advantage actor_loss = torch.mean(-torch.min (surr1, surr2)) critic_loss = torch.mean(F.mse_loss(self.critic(states), td_target.detach())) self.actor_optimizer.zero_grad() self.critic_optimizer.zero_grad() actor_loss.backward() critic_loss.backward() self.actor_optimizer.step() self.critic_optimizer.step() actor_lr = 1e-3 critic_lr = 1e-2 num_episodes = 250 hidden_dim = 128 gamma = 0.98 lmbda = 0.95 epochs = 10 eps = 0.2 device = torch.device("cuda" ) if torch.cuda.is_available() else torch.device("cpu" ) env_name = 'CartPole-v0' env = gym.make(env_name) env.seed(0 ) torch.manual_seed(0 ) state_dim = env.observation_space.shape[0 ] action_dim = env.action_space.n ppo_agent = PPO(state_dim, hidden_dim, action_dim, actor_lr, critic_lr, lmbda, epochs, eps, gamma, device) return_list = rl_utils.train_on_policy_agent(env, ppo_agent, num_episodes)
1 2 3 4 5 6 7 8 9 10 Iteration 0: 100%|██████████| 25/25 [00:00<00:00, 29.67it/s, episode=20, return=40.700] Iteration 1: 100%|██████████| 25/25 [00:01<00:00, 15.19it/s, episode=45, return=182.800] Iteration 2: 100%|██████████| 25/25 [00:01<00:00, 14.11it/s, episode=70, return=176.100] Iteration 3: 100%|██████████| 25/25 [00:01<00:00, 14.44it/s, episode=95, return=191.500] Iteration 4: 100%|██████████| 25/25 [00:01<00:00, 14.45it/s, episode=120, return=151.300] Iteration 5: 100%|██████████| 25/25 [00:02<00:00, 12.15it/s, episode=145, return=200.000] Iteration 6: 100%|██████████| 25/25 [00:01<00:00, 13.47it/s, episode=170, return=200.000] Iteration 7: 100%|██████████| 25/25 [00:01<00:00, 13.20it/s, episode=195, return=200.000] Iteration 8: 100%|██████████| 25/25 [00:01<00:00, 14.43it/s, episode=220, return=188.100] Iteration 9: 100%|██████████| 25/25 [00:01<00:00, 13.13it/s, episode=245, return=200.000]
接下来开始生成专家数据。因为车杆环境比较简单,我们只生成一条轨迹,并且从中采样 30 个状态动作对( s , a ) (s,a) ( s , a ) 样本。我们只用这 30 个专家数据样本来训练模仿策略。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 def sample_expert_data (n_episode ): states = [] actions = [] for episode in range (n_episode): state = env.reset() done = False while not done: action = ppo_agent.take_action(state) states.append(state) actions.append(action) next_state, reward, done, _ = env.step(action) state = next_state return np.array(states), np.array(actions) env.seed(0 ) torch.manual_seed(0 ) random.seed(0 ) n_episode = 1 expert_s, expert_a = sample_expert_data(n_episode) n_samples = 30 random_index = random.sample(range (expert_s.shape[0 ]), n_samples) expert_s = expert_s[random_index] expert_a = expert_a[random_index]
15.4.2 行为克隆的代码实践
在 BC 中,我们将专家数据中的( s t , a t ) (s_t,a_t) ( s t , a t ) 中的a t a_t a t 视为标签,BC 则转化成监督学习中经典的分类问题,采用最大似然估计的训练方法可得到分类结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 class BehaviorClone : def __init__ (self, state_dim, hidden_dim, action_dim, lr ): self.policy = PolicyNet(state_dim, hidden_dim, action_dim).to(device) self.optimizer = torch.optim.Adam(self.policy.parameters(), lr=lr) def learn (self, states, actions ): states = torch.tensor(states, dtype=torch.float ).to(device) actions = torch.tensor(actions).view(-1 , 1 ).to(device) log_probs = torch.log(self.policy(states).gather(1 , actions)) bc_loss = torch.mean(-log_probs) self.optimizer.zero_grad() bc_loss.backward() self.optimizer.step() def take_action (self, state ): state = torch.tensor([state], dtype=torch.float ).to(device) probs = self.policy(state) action_dist = torch.distributions.Categorical(probs) action = action_dist.sample() return action.item()def test_agent (agent, env, n_episode ): return_list = [] for episode in range (n_episode): episode_return = 0 state = env.reset() done = False while not done: action = agent.take_action(state) next_state, reward, done, _ = env.step(action) state = next_state episode_return += reward return_list.append(episode_return) return np.mean(return_list) env.seed(0 ) torch.manual_seed(0 ) np.random.seed(0 ) lr = 1e-3 bc_agent = BehaviorClone(state_dim, hidden_dim, action_dim, lr) n_iterations = 1000 batch_size = 64 test_returns = []with tqdm(total=n_iterations, desc="进度条" ) as pbar: for i in range (n_iterations): sample_indices = np.random.randint(low = 0 , high = expert_s.shape[0 ], size=batch_size) bc_agent.learn(expert_s[sample_indices], expert_a[sample_indices]) current_return = test_agent(bc_agent, env, 5 ) test_returns.append(current_return) if (i + 1 ) % 10 == 0 : pbar.set_postfix({'return' : '%.3f' % np.mean(test_returns[-10 :])}) pbar.update(1 )
1 进度条: 100%|██████████| 1000/1000 [03:05<00:00, 5.40it/s, return=199.320]
1 2 3 4 5 6 iteration_list = list (range (len (test_returns))) plt.plot(iteration_list, test_returns) plt.xlabel('Iterations' ) plt.ylabel('Returns' ) plt.title('BC on {}' .format (env_name)) plt.show()
我们发现 BC 无法学习到最优策略(不同设备运行结果可能会有不同),这主要是因为在数据量比较少的情况下,学习容易发生过拟合。
15.4.3 生成式对抗模仿学习的代码实践
接下来我们实现 GAIL 的代码。
首先实现判别器模型,其模型架构为一个两层的全连接网络,模型输入为一个状态动作对,输出一个概率标量。
1 2 3 4 5 6 7 8 9 10 class Discriminator (nn.Module): def __init__ (self, state_dim, hidden_dim, action_dim ): super (Discriminator, self).__init__() self.fc1 = torch.nn.Linear(state_dim + action_dim, hidden_dim) self.fc2 = torch.nn.Linear(hidden_dim, 1 ) def forward (self, x, a ): cat = torch.cat([x, a], dim=1 ) x = F.relu(self.fc1(cat)) return torch.sigmoid(self.fc2(x))
接下来正式实现 GAIL 的代码。每一轮迭代中,GAIL 中的策略和环境交互,采样新的状态动作对。基于专家数据和策略新采样的数据,首先训练判别器,然后将判别器的输出转换为策略的奖励信号,指导策略用 PPO 算法做训练。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 class GAIL : def __init__ (self, agent, state_dim, action_dim, hidden_dim, lr_d ): self.discriminator = Discriminator(state_dim, hidden_dim, action_dim).to(device) self.discriminator_optimizer = torch.optim.Adam(self.discriminator.parameters(), lr_d) self.agent = agent def learn (self, expert_s, expert_a, agent_s, agent_a, next_s, dones ): expert_states = torch.tensor(expert_s, dtype=torch.float ).to(device) expert_actions = torch.tensor(expert_a).to(device) agent_states = torch.tensor(agent_s, dtype=torch.float ).to(device) agent_actions = torch.tensor(agent_a).to(device) expert_actions = F.one_hot(expert_actions, num_classes=2 ).float () agent_actions = F.one_hot(agent_actions, num_classes=2 ).float () expert_prob = self.discriminator(expert_states, expert_actions) agent_prob = self.discriminator(agent_states, agent_actions) discriminator_loss = nn.BCELoss()(agent_prob, torch.ones_like(agent_prob)) + nn.BCELoss()(expert_prob, torch.zeros_like(expert_prob)) self.discriminator_optimizer.zero_grad() discriminator_loss.backward() self.discriminator_optimizer.step() rewards = -torch.log(agent_prob).detach().cpu().numpy() transition_dict = { 'states' : agent_s, 'actions' : agent_a, 'rewards' : rewards, 'next_states' : next_s, 'dones' : dones } self.agent.update(transition_dict) env.seed(0 ) torch.manual_seed(0 ) lr_d = 1e-3 agent = PPO(state_dim, hidden_dim, action_dim, actor_lr, critic_lr, lmbda, epochs, eps, gamma, device) gail = GAIL(agent, state_dim, action_dim, hidden_dim, lr_d) n_episode = 500 return_list = []with tqdm(total=n_episode, desc="进度条" ) as pbar: for i in range (n_episode): episode_return = 0 state = env.reset() done = False state_list = [] action_list = [] next_state_list = [] done_list = [] while not done: action = agent.take_action(state) next_state, reward, done, _ = env.step(action) state_list.append(state) action_list.append(action) next_state_list.append(next_state) done_list.append(done) state = next_state episode_return += reward return_list.append(episode_return) gail.learn(expert_s, expert_a, state_list, action_list, next_state_list, done_list) if (i + 1 ) % 10 == 0 : pbar.set_postfix({'return' : '%.3f' % np.mean(return_list[-10 :])}) pbar.update(1 )
1 进度条: 100%|██████████| 500/500 [00:35<00:00, 14.20it/s, return=200.000]
1 2 3 4 5 6 iteration_list = list (range (len (return_list))) plt.plot(iteration_list, return_list) plt.xlabel('Episodes' ) plt.ylabel('Returns' ) plt.title('GAIL on {}' .format (env_name)) plt.show()
通过上面两个实验的对比我们可以直观地感受到,在数据样本有限的情况下,BC 不能学习到最优策略,但是 GAIL 在相同的专家数据下可以取得非常好的结果。这一方面归因于 GAIL 的训练目标(拉近策略和专家的占用度量)十分贴合模仿学习任务的目标,避免了 BC 中的复合误差问题;另一方面得益于 GAIL 训练中,策略可以和环境交互出更多的数据,以此训练判别器,进而生成对基于策略“量身定做”的指导奖励信号。
15.5 总结
本章讲解了模仿学习的基础概念,即根据一些专家数据来学习一个策略,数据中不包含奖励,和环境交互也不能获得奖励。本章还介绍了模仿学习中的两类方法,分别是行为克隆(BC)和生成式对抗模仿学习(GAIL)。通过实验对比发现,在少量专家数据的情况下,GAIL 能获得更好的效果。
此外,逆向强化学习(IRL)也是模仿学习中的重要方法,它假设环境的奖励函数应该使得专家轨迹获得最高的奖励值,进而学习背后的奖励函数,最后基于该奖励函数做正向强化学习,从而得到模仿策略。感兴趣的读者可以查阅相关文献进行学习。
15.6 参考文献
[1] SYED U, BOWLING M, SCHAPIRE R E. Apprenticeship learning using linear programming [C]// Proceedings of the 25th international conference on Machine learning, 2008: 1032-1039.
[2] HO J, ERMON S. Generative adversarial imitation learning [J]. Advances in neural information processing systems 2016, 29: 4565-4573.
[3] ABBEEL P, NG A Y. Apprenticeship learning via inverse reinforcement learning [C] // Proceedings of the twenty-first international conference on machine learning, 2004.
16 模型预测控制
16.1 简介
之前几章介绍了基于值函数的方法 DQN、基于策略的方法 REINFORCE 以及两者结合的方法 Actor-Critic。它们都是无模型 (model-free)的方法,即没有建立一个环境模型来帮助智能体决策。而在深度强化学习领域下,基于模型 (model-based)的方法通常用神经网络学习一个环境模型,然后利用该环境模型来帮助智能体训练和决策。利用环境模型帮助智能体训练和决策的方法有很多种,例如可以用与之前的 Dyna 类似的思想生成一些数据来加入策略训练中。本章要介绍的模型预测控制(model predictive control,MPC)算法并不构建一个显式的策略,只根据环境模型来选择当前步要采取的动作。
16.2 打靶法
首先,让我们用一个形象的比喻来帮助理解模型预测控制方法。假设我们在下围棋,现在根据棋盘的布局,我们要选择现在落子的位置。一个优秀的棋手会根据目前局势来推演落子几步可能发生的局势,然后选择局势最好的一种情况来决定当前落子位置。
模型预测控制方法就是这样一种迭代的、基于模型的控制方法。值得注意的是,MPC 方法中不存在一个显式的策略。具体而言,MPC 方法在每次采取动作时,首先会生成一些候选动作序列,然后根据当前状态来确定每一条候选序列能得到多好的结果,最终选择结果最好的那条动作序列的第一个动作来执行。因此,在使用 MPC 方法时,主要在两个过程中迭代,一是根据历史数据学习环境模型P ^ ( s , a ) \hat{P}(s,a) P ^ ( s , a ) ,二是在和真实环境交互过程中用环境模型来选择动作。
首先,我们定义模型预测方法的目标。在第k k k 步时,我们要想做的就是最大化智能体的累积奖励,具体来说就是:
arg max a k : k + H ∑ t = k k + H r ( s t , a t ) s.t. s t + 1 = P ^ ( s t , a t ) \begin{aligned}
&\argmax_{a_{k:k+H}} \sum_{t=k}^{k+H} r(s_t,a_t)\\
&\text{s.t. } s_{t+1} = \hat{P}(s_t,a_t)
\end{aligned}
a k : k + H arg max t = k ∑ k + H r ( s t , a t ) s.t. s t + 1 = P ^ ( s t , a t )
其中H H H 为推演的长度,arg max a k : k + H \displaystyle\argmax_{a_{k:k+H}} a k : k + H arg max 表示从所有动作序列中选取累积奖励最大的序列。我们每次取最优序列中的第一个动作a k a_k a k 来与环境交互。MPC 方法中的一个关键是如何生成一些候选动作序列,候选动作生成的好坏将直接影响到 MPC 方法得到的动作。生成候选动作序列的过程我们称为打靶 (shooting).
16.2.1 随机打靶法
随机打靶法 (random shooting method)的做法便是随机生成N N N 条动作序列,即在生成每条动作序列的每一个动作时,都是从动作空间中随机采样一个动作,最终组合成N N N 条长度为K K K 的动作序列。
对于一些简单的环境,这个方法不但十分简单,而且效果还不错。那么,能不能在随机的基础上,根据已有的结果做得更好一些呢?接下来,我们来介绍另外一种打靶法:交叉熵方法 。
16.2.2 交叉熵方法
交叉熵方法 (cross entropy method,CEM)是一种进化策略方法,它的核心思想是维护一个带参数的分布,根据每次采样的结果来更新分布中的参数,使得分布中能获得较高累积奖励的动作序列的概率比较高。相比于随机打靶法,交叉熵方法能够利用之前采样到的比较好的结果,在一定程度上减少采样到一些较差动作的概率,从而使得算法更加高效。对于一个与连续动作交互的环境来说,每次交互时交叉熵方法的做法如下:
for 次数e = 1 → E e=1\rightarrow E e = 1 → E do
从分布P ( A ) P(\mathbf{A}) P ( A ) 中选取N N N 条动作序列A 1 , ⋯ , A N \mathbf{A}_1,\cdots,\mathbf{A}_N A 1 , ⋯ , A N
对于每条动作序列A 1 , ⋯ , A N \mathbf{A}_1,\cdots,\mathbf{A}_N A 1 , ⋯ , A N ,用环境模型评估累积奖励
根据评估结果保留M M M 条最优的动作序列A i 1 , ⋯ , A i M \mathbf{A}_{i_1},\cdots,\mathbf{A}_{i_M} A i 1 , ⋯ , A i M
用这些动作序列A i 1 , ⋯ , A i M \mathbf{A}_{i_1},\cdots,\mathbf{A}_{i_M} A i 1 , ⋯ , A i M 去更新分布p ( A ) p(\mathbf{A}) p ( A )
end for
计算所有最优动作序列的第一个动作的均值,作为当前时刻采取的动作
我们可以使用如下的代码来实现交叉熵方法,其中将采用截断正态分布。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 import numpy as npfrom scipy.stats import truncnormimport gymimport itertoolsimport torchimport torch.nn as nnimport torch.nn.functional as Fimport collectionsimport matplotlib.pyplot as pltclass CEM : def __init__ (self, n_sequence, elite_ratio, fake_env, upper_bound, lower_bound ): self.n_sequence = n_sequence self.elite_ratio = elite_ratio self.upper_bound = upper_bound self.lower_bound = lower_bound self.fake_env = fake_env def optimize (self, state, init_mean, init_var ): mean, var = init_mean, init_var X = truncnorm(-2 , 2 , loc=np.zeros_like(mean), scale=np.ones_like(var)) state = np.tile(state, (self.n_sequence, 1 )) for _ in range (5 ): lb_dist, ub_dist = mean - self.lower_bound, self.upper_bound - mean constrained_var = np.minimum( np.minimum( np.square(lb_dist / 2 ), np.square(ub_dist / 2 ) ), var ) action_sequences = [X.rvs() for _ in range (self.n_sequence)] * np.sqrt(constrained_var) + mean returns = self.fake_env.propagate(state, action_sequences)[:, 0 ] elites = action_sequences[np.argsort(returns)][-int (self.elite_ratio * self.n_sequence):] new_mean = np.mean(elites, axis=0 ) new_var = np.var(elites, axis=0 ) mean = 0.1 * mean + 0.9 * new_mean var = 0.1 * var + 0.9 * new_var return mean
16.3 PETS 算法
带有轨迹采样的概率集成(probabilistic ensembles with trajectory sampling,PETS)是一种使用 MPC 的基于模型的强化学习算法。在 PETS 中,环境模型采用了集成学习的方法,即会构建多个环境模型,然后用这多个环境模型来进行预测,最后使用 CEM 进行模型预测控制。接下来,我们来详细介绍模型构建与模型预测的方法。
在强化学习中,与智能体交互的环境是一个动态系统,所以拟合它的环境模型也通常是一个动态模型。我们通常认为一个系统中有两种不确定性,分别是偶然不确定性 (aleatoric uncertainty)和认知不确定性 (epistemic uncertainty)。偶然不确定性是由于系统中本身存在的随机性引起的,而认知不确定性是由“见”过的数据较少导致的自身认知的不足而引起的,如图 16-1 所示。
图16-1 偶然不确定性和认知不确定性
在 PET 算法中,环境模型的构建会同时考虑到这两种不确定性。首先,我们定义环境模型的输出为一个高斯分布,用来捕捉偶然不确定性。令环境模型为P ^ \hat{P} P ^ ,其参数为θ \theta θ ,那么基于当前状态动作对( s t , a t ) (s_t,a_t) ( s t , a t ) ,下一个状态s t s_t s t 的分布可以写为
P ^ ( s t , a t ) = N ( μ θ ( s t , a t ) , Σ θ ( s t , a t ) ) \hat{P}(s_t,a_t) = \mathcal{N}\Big( \mu_\theta(s_t,a_t), \Sigma_\theta(s_t,a_t) \Big)
P ^ ( s t , a t ) = N ( μ θ ( s t , a t ) , Σ θ ( s t , a t ) )
这里我们可以采用神经网络来构建μ θ \mu_\theta μ θ 和Σ θ \Sigma_\theta Σ θ 。这样,神经网络的损失函数则为
L ( θ ) = ∑ n = 1 N [ μ θ ( s n , a n ) − s n + 1 ] T Σ θ − 1 ( s t , a t ) [ μ θ ( s n , a n ) − s n + 1 ] + log det Σ θ ( s n , a n ) \mathcal{L}(\theta) = \sum_{n=1}^N \Big[ \mu_\theta(s_n,a_n) - s_{n+1} \Big]^T
\Sigma_\theta^{-1}(s_t,a_t) \Big[ \mu_\theta(s_n,a_n) - s_{n+1} \Big] + \log \det \Sigma_\theta(s_n,a_n)
L ( θ ) = n = 1 ∑ N [ μ θ ( s n , a n ) − s n + 1 ] T Σ θ − 1 ( s t , a t ) [ μ θ ( s n , a n ) − s n + 1 ] + log det Σ θ ( s n , a n )
这样我们就得到了一个由神经网络表示的环境模型。在此基础之上,我们选择用集成 (ensemble)方法来捕捉认知不确定性。具体而言,我们构建B B B 个网络框架一样的神经网络,它们的输入都是状态动作对,输出都是下一个状态的高斯分布的均值向量和协方差矩阵。但是它们的参数采用不同的随机初始化方式,并且当每次训练时,会从真实数据中随机采样不同的数据来训练。
有了环境模型的集成后,MPC 算法会用其来预测奖励和下一个状态。具体来说,每一次预测会从B B B 个模型中挑选一个来进行预测,因此一条轨迹的采样会使用到多个环境模型,如图 16-2 所示。
图16-2 PETS 算法利用各个环境模型选取动作
16.4 PETS 算法实践
首先,为了搭建这样一个较为复杂的模型,我们定义模型中每一层的构造。在定义时就必须考虑每一层都是一个集成。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 device = torch.device("cuda" ) if torch.cuda.is_available() else torch.device("cpu" )class Swish (nn.Module): ''' Swish激活函数 ''' def __init__ (self ): super (Swish, self).__init__() def forward (self, x ): return x * torch.sigmoid(x)def init_weights (m ): ''' 初始化模型权重 ''' def truncated_normal_init (t, mean=0.0 , std=0.01 ): torch.nn.init.normal_(t, mean=mean, std=std) while True : cond = (t < mean - 2 * std) | (t > mean + 2 * std) if not torch.sum (cond): break t = torch.where( cond, torch.nn.init.normal_( torch.ones(t.shape, device=device), mean=mean, std=std ), t ) return t if type (m) == nn.Linear or isinstance (m, FCLayer): truncated_normal_init(m.weight, std=1 / (2 * np.sqrt(m._input_dim))) m.bias.data.fill_(0.0 )class FCLayer (nn.Module): ''' 集成之后的全连接层 ''' def __init__ (self, input_dim, output_dim, ensemble_size, activation ): super (FCLayer, self).__init__() self._input_dim, self._output_dim = input_dim, output_dim self.weight = nn.Parameter(torch.Tensor(ensemble_size, input_dim, output_dim).to(device)) self._activation = activation self.bias = nn.Parameter(torch.Tensor(ensemble_size, output_dim).to(device)) def forward (self, x ): return self._activation( torch.add(torch.bmm(x, self.weight), self.bias[:, None , :]))
接着,使用高斯分布的概率模型来定义一个集成模型。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 class EnsembleModel (nn.Module): ''' 环境模型集成 ''' def __init__ ( self, state_dim, action_dim, ensemble_size=5 , learning_rate=1e-3 ): super (EnsembleModel, self).__init__() self._output_dim = (state_dim + 1 ) * 2 self._max_logvar = nn.Parameter( (torch.ones((1 , self._output_dim // 2 )).float () / 2 ).to(device), requires_grad=False ) self._min_logvar = nn.Parameter( (-torch.ones((1 , self._output_dim // 2 )).float () * 10 ).to(device), requires_grad=False ) self.layer1 = FCLayer(state_dim + action_dim, 200 , ensemble_size, Swish()) self.layer2 = FCLayer(200 , 200 , ensemble_size, Swish()) self.layer3 = FCLayer(200 , 200 , ensemble_size, Swish()) self.layer4 = FCLayer(200 , 200 , ensemble_size, Swish()) self.layer5 = FCLayer(200 , self._output_dim, ensemble_size, nn.Identity()) self.apply(init_weights) self.optimizer = torch.optim.Adam(self.parameters(), lr=learning_rate) def forward (self, x, return_log_var=False ): ret = self.layer5(self.layer4(self.layer3(self.layer2(self.layer1(x))))) mean = ret[:, :, :self._output_dim // 2 ] logvar = self._max_logvar - F.softplus(self._max_logvar - ret[:, :, self._output_dim // 2 :]) logvar = self._min_logvar + F.softplus(logvar - self._min_logvar) return mean, logvar if return_log_var else torch.exp(logvar) def loss (self, mean, logvar, labels, use_var_loss=True ): inverse_var = torch.exp(-logvar) if use_var_loss: mse_loss = torch.mean( torch.mean(torch.pow (mean - labels, 2 ) * inverse_var, dim=-1 ), dim=-1 ) var_loss = torch.mean(torch.mean(logvar, dim=-1 ), dim=-1 ) total_loss = torch.sum (mse_loss) + torch.sum (var_loss) else : mse_loss = torch.mean(torch.pow (mean - labels, 2 ), dim=(1 , 2 )) total_loss = torch.sum (mse_loss) return total_loss, mse_loss def train (self, loss ): self.optimizer.zero_grad() loss += 0.01 * torch.sum (self._max_logvar) - 0.01 * torch.sum (self._min_logvar) loss.backward() self.optimizer.step()
接下来,我们定义一个EnsembleDynamicsModel
的类,把模型集成的训练设计得更加精细化。具体而言,我们并不会选择模型训练的轮数,而是在每次训练的时候将一部分数据单独取出来,用于验证模型的表现,在 5 次没有获得表现提升时就结束训练。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 class EnsembleDynamicsModel : ''' 环境模型集成,加入精细化的训练 ''' def __init__ (self, state_dim, action_dim, num_network=5 ): self._num_network = num_network self._state_dim, self._action_dim = state_dim, action_dim self.model = EnsembleModel( state_dim, action_dim, ensemble_size=num_network ) self._epoch_since_last_update = 0 def train ( self, inputs, labels, batch_size=64 , holdout_ratio=0.1 , max_iter=20 ): permutation = np.random.permutation(inputs.shape[0 ]) inputs, labels = inputs[permutation], labels[permutation] num_holdout = int (inputs.shape[0 ] * holdout_ratio) train_inputs, train_labels = inputs[num_holdout:], labels[num_holdout:] holdout_inputs, holdout_labels = inputs[:num_holdout], labels[:num_holdout] holdout_inputs = torch.from_numpy(holdout_inputs).float ().to(device) holdout_labels = torch.from_numpy(holdout_labels).float ().to(device) holdout_inputs = holdout_inputs[None , :, :].repeat([self._num_network, 1 , 1 ]) holdout_labels = holdout_labels[None , :, :].repeat([self._num_network, 1 , 1 ]) self._snapshots = {i: (None , 1e10 ) for i in range (self._num_network)} for epoch in itertools.count(): train_index = np.vstack([ np.random.permutation(train_inputs.shape[0 ]) for _ in range (self._num_network) ]) for batch_start_pos in range (0 , train_inputs.shape[0 ], batch_size): batch_index = train_index[:, batch_start_pos:batch_start_pos + batch_size] train_input = torch.from_numpy(train_inputs[batch_index]).float ().to(device) train_label = torch.from_numpy(train_labels[batch_index]).float ().to(device) mean, logvar = self.model(train_input, return_log_var=True ) loss, _ = self.model.loss(mean, logvar, train_label) self.model.train(loss) with torch.no_grad(): mean, logvar = self.model(holdout_inputs, return_log_var=True ) _, holdout_losses = self.model.loss( mean, logvar, holdout_labels, use_var_loss=False ) holdout_losses = holdout_losses.cpu() break_condition = self._save_best(epoch, holdout_losses) if break_condition or epoch > max_iter: break def _save_best (self, epoch, losses, threshold=0.1 ): updated = False for i in range (len (losses)): current = losses[i] _, best = self._snapshots[i] improvement = (best - current) / best if improvement > threshold: self._snapshots[i] = (epoch, current) updated = True self._epoch_since_last_update = 0 if updated else self._epoch_since_last_update + 1 return self._epoch_since_last_update > 5 def predict (self, inputs, batch_size=64 ): mean, var = [], [] for i in range (0 , inputs.shape[0 ], batch_size): input = torch.from_numpy( inputs[i:min (i + batch_size, inputs.shape[0 ])] ).float ().to(device) cur_mean, cur_var = self.model(input [None , :, :].repeat([self._num_network, 1 , 1 ]), return_log_var=False ) mean.append(cur_mean.detach().cpu().numpy()) var.append(cur_var.detach().cpu().numpy()) return np.hstack(mean), np.hstack(var)
有了环境模型之后,我们就可以定义一个FakeEnv,主要用于实现给定状态和动作,用模型集成来进行预测。该功能会用在 MPC 算法中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 class FakeEnv : def __init__ (self, model ): self.model = model def step (self, obs, act ): inputs = np.concatenate((obs, act), axis=-1 ) ensemble_model_means, ensemble_model_vars = self.model.predict(inputs) ensemble_model_means[:, :, 1 :] += obs.numpy() ensemble_model_stds = np.sqrt(ensemble_model_vars) ensemble_samples = ensemble_model_means + np.random.normal( size=ensemble_model_means.shape ) * ensemble_model_stds num_models, batch_size, _ = ensemble_model_means.shape models_to_use = np.random.choice( [i for i in range (self.model._num_network)], size=batch_size ) batch_inds = np.arange(0 , batch_size) samples = ensemble_samples[models_to_use, batch_inds] rewards, next_obs = samples[:, :1 ], samples[:, 1 :] return rewards, next_obs def propagate (self, obs, actions ): with torch.no_grad(): obs = np.copy(obs) total_reward = np.expand_dims(np.zeros(obs.shape[0 ]), axis=-1 ) obs, actions = torch.as_tensor(obs), torch.as_tensor(actions) for i in range (actions.shape[1 ]): action = torch.unsqueeze(actions[:, i], 1 ) rewards, next_obs = self.step(obs, action) total_reward += rewards obs = torch.as_tensor(next_obs) return total_reward
接下来定义经验回放池的类Replay Buffer
。与之前的章节对比,此处经验回放缓冲区会额外实现一个返回所有数据的函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 class ReplayBuffer : def __init__ (self, capacity ): self.buffer = collections.deque(maxlen=capacity) def add (self, state, action, reward, next_state, done ): self.buffer.append((state, action, reward, next_state, done)) def size (self ): return len (self.buffer) def return_all_samples (self ): all_transitions = list (self.buffer) state, action, reward, next_state, done = zip (*all_transitions) return np.array(state), action, reward, np.array(next_state), done
接下来是 PETS
算法的主体部分。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 class PETS : ''' PETS算法 ''' def __init__ (self, env, replay_buffer, n_sequence, elite_ratio, plan_horizon, num_episodes ): self._env = env self._env_pool = replay_buffer obs_dim = env.observation_space.shape[0 ] self._action_dim = env.action_space.shape[0 ] self._model = EnsembleDynamicsModel(obs_dim, self._action_dim) self._fake_env = FakeEnv(self._model) self.upper_bound = env.action_space.high[0 ] self.lower_bound = env.action_space.low[0 ] self._cem = CEM(n_sequence, elite_ratio, self._fake_env, self.upper_bound, self.lower_bound) self.plan_horizon = plan_horizon self.num_episodes = num_episodes def train_model (self ): env_samples = self._env_pool.return_all_samples() obs = env_samples[0 ] actions = np.array(env_samples[1 ]) rewards = np.array(env_samples[2 ]).reshape(-1 , 1 ) next_obs = env_samples[3 ] inputs = np.concatenate((obs, actions), axis=-1 ) labels = np.concatenate((rewards, next_obs - obs), axis=-1 ) self._model.train(inputs, labels) def mpc (self ): mean = np.tile( (self.upper_bound + self.lower_bound) / 2.0 , self.plan_horizon ) var = np.tile( np.square(self.upper_bound - self.lower_bound) / 16 , self.plan_horizon ) obs, done, episode_return = self._env.reset(), False , 0 while not done: actions = self._cem.optimize(obs, mean, var) action = actions[:self._action_dim] next_obs, reward, done, _ = self._env.step(action) self._env_pool.add(obs, action, reward, next_obs, done) obs = next_obs episode_return += reward mean = np.concatenate([ np.copy(actions)[self._action_dim:], np.zeros(self._action_dim) ]) return episode_return def explore (self ): obs, done, episode_return = self._env.reset(), False , 0 while not done: action = self._env.action_space.sample() next_obs, reward, done, _ = self._env.step(action) self._env_pool.add(obs, action, reward, next_obs, done) obs = next_obs episode_return += reward return episode_return def train (self ): return_list = [] explore_return = self.explore() print ('episode: 1, return: %d' % explore_return) return_list.append(explore_return) for i_episode in range (self.num_episodes - 1 ): self.train_model() episode_return = self.mpc() return_list.append(episode_return) print ('episode: %d, return: %d' % (i_episode + 2 , episode_return)) return return_list
大功告成!让我们在倒立摆环境上试一下吧,以下代码需要一定的运行时间。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 buffer_size = 100000 n_sequence = 50 elite_ratio = 0.2 plan_horizon = 25 num_episodes = 10 env_name = 'Pendulum-v0' env = gym.make(env_name) replay_buffer = ReplayBuffer(buffer_size) pets = PETS(env, replay_buffer, n_sequence, elite_ratio, plan_horizon, num_episodes) return_list = pets.train() episodes_list = list (range (len (return_list))) plt.plot(episodes_list, return_list) plt.xlabel('Episodes' ) plt.ylabel('Returns' ) plt.title('PETS on {}' .format (env_name)) plt.show()
1 2 3 4 5 6 7 8 9 10 episode: 1, return: -985 episode: 2, return: -1384 episode: 3, return: -1006 episode: 4, return: -1853 episode: 5, return: -378 episode: 6, return: -123 episode: 7, return: -124 episode: 8, return: -122 episode: 9, return: -124 episode: 10, return: -125
可以看出,PETS 算法的效果非常好,但是由于每次选取动作都需要在环境模型上进行大量的模拟,因此运行速度非常慢。与 SAC 算法的结果进行对比可以看出,PETS 算法大大提高了样本效率,在比 SAC 算法的环境交互次数少得多的情况下就取得了差不多的效果。
16.5 总结
通过学习与实践,我们可以看出模型预测控制(MPC)方法有着其独特的优势,例如它不用构建和训练策略,可以更好地利用环境,可以进行更长步数的规划。但是 MPC 也有其局限性,例如模型在多步推演之后的准确性会大大降低,简单的控制策略对于复杂系统可能不够。MPC 还有一个更为严重的问题,即每次计算动作的复杂度太大,这使其在一些策略及时性要求较高的系统中应用就变得不太现实。
16.6 参考文献
[1] CHUA K, CALANDRA R, MCALLISTER R, et al. Deep reinforcement learning in a handful of trials using probabilistic dynamics models [J]. Advances in neural information processing systems, 2018: 31.
[2] LAKSHMINARAYANAN B, PRITZEL A, BLUNDELL C. Simple and scalable predictive uncertainty estimation using deep ensembles [J]. Advances in neural information processing systems, 2017: 30.
17 基于模型的策略优化
17.1 简介
第 16 章介绍的 PETS 算法是基于模型的强化学习算法中的一种,它没有显式构建一个策略(即一个从状态到动作的映射函数)。回顾一下之前介绍过的 Dyna-Q 算法,它也是一种基于模型的强化学习算法。但是 Dyna-Q 算法中的模型只存储之前遇到的数据,只适用于表格型环境。而在连续型状态和动作的环境中,我们需要像 PETS 算法一样学习一个用神经网络表示的环境模型,此时若继续利用 Dyna 的思想,可以在任意状态和动作下用环境模型来生成一些虚拟数据,这些虚拟数据可以帮助进行策略的学习。如此,通过和模型进行交互产生额外的虚拟数据,对真实环境中样本的需求量就会减少,因此通常会比无模型的强化学习方法具有更高的采样效率。本章将介绍这样一种算法——MBPO 算法。
17.2 MBPO 算法
基于模型的策略优化 (model-based policy optimization,MBPO)算法是加州大学伯克利分校的研究员在 2019 年的 NeurIPS 会议中提出的。随即 MBPO 成为深度强化学习中最重要的基于模型的强化学习算法之一。
MBPO 算法基于以下两个关键的观察: (1) 随着环境模型的推演步数变长,模型累积的复合误差会快速增加,使得环境模型得出的结果变得很不可靠; (2) 必须要权衡推演步数增加后模型增加的误差带来的负面作用与步数增加后使得训练的策略更优的正面作用,二者的权衡决定了推演的步数。
MBPO 算法在这两个观察的基础之上,提出只使用模型来从之前访问过的真实状态开始进行较短步数的推演,而非从初始状态开始进行完整的推演。这就是 MBPO 中的分支推演 (branched rollout)的概念,即在原来真实环境中采样的轨迹上面推演出新的“短分支”,如图 17-1 所示。这样做可以使模型的累积误差不至于过大,从而保证最后的采样效率和策略表现。
图17-1 分支推演示意图
MBPO 与第 6 章讲解的经典的 Dyna-Q 算法十分类似。Dyna-Q 采用的无模型强化学习部分是 Q-learning,而 MBPO 采用的是 SAC。此外,MBPO 算法中环境模型的构建和 PETS 算法中一致,都使用模型集成的方式,并且其中每一个环境模型的输出都是一个高斯分布。接下来,我们来看一下 MBPO 的具体算法框架。MBPO 算法会把真实环境样本作为分支推演的起点,使用模型进行一定步数的推演,并用推演得到的模型数据用来训练模型。
初始化策略π ϕ \pi_\phi π ϕ 、环境模型参数p θ p_\theta p θ 、真实环境数据集D env D_{\text{env}} D env 、模型数据集D model D_{\text{model}} D model
for 轮数n = 1 → N n=1\rightarrow N n = 1 → N do
通过环境数据来训练模型参数p θ p_\theta p θ
for 时间步t = 1 → T t=1\rightarrow T t = 1 → T do
根据策略π ϕ \pi_\phi π ϕ 与环境交互,并将交互的轨迹添加到D env D_{\text{env}} D env 中
for 模型推演次数e = 1 → E e=1\rightarrow E e = 1 → E do
从D env D_{\text{env}} D env 中均匀随机采样一个状态s t s_t s t
以s t s_t s t 为初始状态,在模型中使用策略π ϕ \pi_\phi π ϕ 进行k k k 步的推演,并将生成的轨迹添加到D model D_{\text{model}} D model 中
end for
for 梯度更新次数g = 1 → G g=1\rightarrow G g = 1 → G do
基于模型数据D model D_{\text{model}} D model ,使用 SAC 来更新策略参数π ϕ \pi_\phi π ϕ
end for
end for
end for
分支推演的长度k k k 是平衡样本效率和策略性能的重要超参数。接下来我们看看 MBPO 的代码,本章最后会给出关于 MBPO 的理论推导,可以指导参数k k k 的选取。
17.3 MBPO 代码实践
首先,我们先导入一些必要的包。
1 2 3 4 5 6 7 8 9 10 11 12 import gymfrom collections import namedtupleimport itertoolsfrom itertools import countimport torchimport torch.nn as nnimport torch.nn.functional as Ffrom torch.distributions.normal import Normalimport numpy as npimport collectionsimport randomimport matplotlib.pyplot as plt
MBPO 算法使用 SAC 算法来训练策略。和 SAC 算法相比,MBPO 多用了一些模型推演得到的数据来训练策略。要想了解 SAC 方法的详细过程,读者可以阅读第 14 章对应的内容。我们将 SAC 代码直接复制到此处。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 class PolicyNet (torch.nn.Module): def __init__ (self, state_dim, hidden_dim, action_dim, action_bound ): super (PolicyNet, self).__init__() self.fc1 = torch.nn.Linear(state_dim, hidden_dim) self.fc_mu = torch.nn.Linear(hidden_dim, action_dim) self.fc_std = torch.nn.Linear(hidden_dim, action_dim) self.action_bound = action_bound def forward (self, x ): x = F.relu(self.fc1(x)) mu = self.fc_mu(x) std = F.softplus(self.fc_std(x)) dist = Normal(mu, std) normal_sample = dist.rsample() log_prob = dist.log_prob(normal_sample) action = torch.tanh(normal_sample) log_prob = log_prob - torch.log(1 - torch.tanh(action).pow (2 ) + 1e-7 ) action = action * self.action_bound return action, log_probclass QValueNet (torch.nn.Module): def __init__ (self, state_dim, hidden_dim, action_dim ): super (QValueNet, self).__init__() self.fc1 = torch.nn.Linear(state_dim + action_dim, hidden_dim) self.fc2 = torch.nn.Linear(hidden_dim, 1 ) def forward (self, x, a ): cat = torch.cat([x, a], dim=1 ) x = F.relu(self.fc1(cat)) return self.fc2(x) device = torch.device("cuda" ) if torch.cuda.is_available() else torch.device( "cpu" )class SAC : ''' 处理连续动作的SAC算法 ''' def __init__ (self, state_dim, hidden_dim, action_dim, action_bound, actor_lr, critic_lr, alpha_lr, target_entropy, tau, gamma ): self.actor = PolicyNet(state_dim, hidden_dim, action_dim, action_bound).to(device) self.critic_1 = QValueNet(state_dim, hidden_dim, action_dim).to(device) self.critic_2 = QValueNet(state_dim, hidden_dim, action_dim).to(device) self.target_critic_1 = QValueNet(state_dim, hidden_dim, action_dim).to(device) self.target_critic_2 = QValueNet(state_dim, hidden_dim, action_dim).to(device) self.target_critic_1.load_state_dict(self.critic_1.state_dict()) self.target_critic_2.load_state_dict(self.critic_2.state_dict()) self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr) self.critic_1_optimizer = torch.optim.Adam(self.critic_1.parameters(), lr=critic_lr) self.critic_2_optimizer = torch.optim.Adam(self.critic_2.parameters(), lr=critic_lr) self.log_alpha = torch.tensor(np.log(0.01 ), dtype=torch.float ) self.log_alpha.requires_grad = True self.log_alpha_optimizer = torch.optim.Adam([self.log_alpha], lr=alpha_lr) self.target_entropy = target_entropy self.gamma = gamma self.tau = tau def take_action (self, state ): state = torch.tensor([state], dtype=torch.float ).to(device) action = self.actor(state)[0 ] return [action.item()] def calc_target (self, rewards, next_states, dones ): next_actions, log_prob = self.actor(next_states) entropy = -log_prob q1_value = self.target_critic_1(next_states, next_actions) q2_value = self.target_critic_2(next_states, next_actions) next_value = torch.min (q1_value, q2_value) + self.log_alpha.exp() * entropy td_target = rewards + self.gamma * next_value * (1 - dones) return td_target def soft_update (self, net, target_net ): for param_target, param in zip (target_net.parameters(), net.parameters()): param_target.data.copy_(param_target.data * (1.0 - self.tau) + param.data * self.tau) def update (self, transition_dict ): states = torch.tensor(transition_dict['states' ], dtype=torch.float ).to(device) actions = torch.tensor(transition_dict['actions' ], dtype=torch.float ).view(-1 , 1 ).to(device) rewards = torch.tensor(transition_dict['rewards' ], dtype=torch.float ).view(-1 , 1 ).to(device) next_states = torch.tensor(transition_dict['next_states' ], dtype=torch.float ).to(device) dones = torch.tensor(transition_dict['dones' ], dtype=torch.float ).view(-1 , 1 ).to(device) rewards = (rewards + 8.0 ) / 8.0 td_target = self.calc_target(rewards, next_states, dones) critic_1_loss = torch.mean(F.mse_loss(self.critic_1(states, actions), td_target.detach())) critic_2_loss = torch.mean(F.mse_loss(self.critic_2(states, actions), td_target.detach())) self.critic_1_optimizer.zero_grad() critic_1_loss.backward() self.critic_1_optimizer.step() self.critic_2_optimizer.zero_grad() critic_2_loss.backward() self.critic_2_optimizer.step() new_actions, log_prob = self.actor(states) entropy = -log_prob q1_value = self.critic_1(states, new_actions) q2_value = self.critic_2(states, new_actions) actor_loss = torch.mean(-self.log_alpha.exp() * entropy - torch.min (q1_value, q2_value)) self.actor_optimizer.zero_grad() actor_loss.backward() self.actor_optimizer.step() alpha_loss = torch.mean((entropy - self.target_entropy).detach() * self.log_alpha.exp()) self.log_alpha_optimizer.zero_grad() alpha_loss.backward() self.log_alpha_optimizer.step() self.soft_update(self.critic_1, self.target_critic_1) self.soft_update(self.critic_2, self.target_critic_2)
接下来定义环境模型,注意这里的环境模型和 PETS 算法中的环境模型是一样的,由多个高斯分布策略的集成来构建。我们也沿用 PETS 算法中的模型构建代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 class Swish (nn.Module): ''' Swish激活函数 ''' def __init__ (self ): super (Swish, self).__init__() def forward (self, x ): return x * torch.sigmoid(x)def init_weights (m ): ''' 初始化模型权重 ''' def truncated_normal_init (t, mean=0.0 , std=0.01 ): torch.nn.init.normal_(t, mean=mean, std=std) while True : cond = (t < mean - 2 * std) | (t > mean + 2 * std) if not torch.sum (cond): break t = torch.where( cond, torch.nn.init.normal_( torch.ones(t.shape, device=device), mean=mean, std=std ), t ) return t if type (m) == nn.Linear or isinstance (m, FCLayer): truncated_normal_init(m.weight, std= 1 / (2 * np.sqrt(m._input_dim))) m.bias.data.fill_(0.0 )class FCLayer (nn.Module): ''' 集成之后的全连接层 ''' def __init__ (self, input_dim, output_dim, ensemble_size, activation ): super (FCLayer, self).__init__() self._input_dim, self._output_dim = input_dim, output_dim self.weight = nn.Parameter( torch.Tensor(ensemble_size, input_dim, output_dim).to(device)) self._activation = activation self.bias = nn.Parameter( torch.Tensor(ensemble_size, output_dim).to(device)) def forward (self, x ): return self._activation( torch.add(torch.bmm(x, self.weight), self.bias[:, None , :]))
接着,我们就可以定义集成模型了,其中就会用到刚刚定义的全连接层。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 class EnsembleModel (nn.Module): ''' 环境模型集成 ''' def __init__ (self, state_dim, action_dim, model_alpha, ensemble_size=5 , learning_rate=1e-3 ): super (EnsembleModel, self).__init__() self._output_dim = (state_dim + 1 ) * 2 self._model_alpha = model_alpha self._max_logvar = nn.Parameter((torch.ones( (1 , self._output_dim // 2 )).float () / 2 ).to(device), requires_grad=False ) self._min_logvar = nn.Parameter((-torch.ones( (1 , self._output_dim // 2 )).float () * 10 ).to(device), requires_grad=False ) self.layer1 = FCLayer(state_dim + action_dim, 200 , ensemble_size, Swish()) self.layer2 = FCLayer(200 , 200 , ensemble_size, Swish()) self.layer3 = FCLayer(200 , 200 , ensemble_size, Swish()) self.layer4 = FCLayer(200 , 200 , ensemble_size, Swish()) self.layer5 = FCLayer(200 , self._output_dim, ensemble_size, nn.Identity()) self.apply(init_weights) self.optimizer = torch.optim.Adam(self.parameters(), lr=learning_rate) def forward (self, x, return_log_var=False ): ret = self.layer5(self.layer4(self.layer3(self.layer2( self.layer1(x))))) mean = ret[:, :, :self._output_dim // 2 ] logvar = ret[:, :, self._output_dim // 2 :] logvar = self._max_logvar - F.softplus(self._max_logvar - logvar) logvar = self._min_logvar + F.softplus(logvar - self._min_logvar) return mean, logvar if return_log_var else torch.exp(logvar) def loss (self, mean, logvar, labels, use_var_loss=True ): inverse_var = torch.exp(-logvar) if use_var_loss: mse_loss = torch.mean( torch.mean( torch.pow (mean - labels, 2 ) * inverse_var, dim=-1 ), dim=-1 ) var_loss = torch.mean(torch.mean(logvar, dim=-1 ), dim=-1 ) total_loss = torch.sum (mse_loss) + torch.sum (var_loss) else : mse_loss = torch.mean(torch.pow (mean - labels, 2 ), dim=(1 , 2 )) total_loss = torch.sum (mse_loss) return total_loss, mse_loss def train (self, loss ): self.optimizer.zero_grad() loss += self._model_alpha * torch.sum ( self._max_logvar) - self._model_alpha * torch.sum (self._min_logvar) loss.backward() self.optimizer.step()class EnsembleDynamicsModel : ''' 环境模型集成,加入精细化的训练 ''' def __init__ (self, state_dim, action_dim, model_alpha=0.01 , num_network=5 ): self._num_network = num_network self._state_dim, self._action_dim = state_dim, action_dim self.model = EnsembleModel( state_dim, action_dim, model_alpha, ensemble_size=num_network ) self._epoch_since_last_update = 0 def train ( self, inputs, labels, batch_size=64 , holdout_ratio=0.1 , max_iter=20 ): permutation = np.random.permutation(inputs.shape[0 ]) inputs, labels = inputs[permutation], labels[permutation] num_holdout = int (inputs.shape[0 ] * holdout_ratio) train_inputs, train_labels = inputs[num_holdout:], labels[num_holdout:] holdout_inputs, holdout_labels = inputs[:num_holdout], labels[:num_holdout] holdout_inputs = torch.from_numpy(holdout_inputs).float ().to(device) holdout_labels = torch.from_numpy(holdout_labels).float ().to(device) holdout_inputs = holdout_inputs[None , :, :].repeat([self._num_network, 1 , 1 ]) holdout_labels = holdout_labels[None , :, :].repeat([self._num_network, 1 , 1 ]) self._snapshots = {i: (None , 1e10 ) for i in range (self._num_network)} for epoch in itertools.count(): train_index = np.vstack([ np.random.permutation(train_inputs.shape[0 ]) for _ in range (self._num_network) ]) for batch_start_pos in range (0 , train_inputs.shape[0 ], batch_size): batch_index = train_index[:, batch_start_pos:batch_start_pos + batch_size] train_input = torch.from_numpy(train_inputs[batch_index]).float ().to(device) train_label = torch.from_numpy(train_labels[batch_index]).float ().to(device) mean, logvar = self.model(train_input, return_log_var=True ) loss, _ = self.model.loss(mean, logvar, train_label) self.model.train(loss) with torch.no_grad(): mean, logvar = self.model(holdout_inputs, return_log_var=True ) _, holdout_losses = self.model.loss(mean, logvar, holdout_labels, use_var_loss=False ) holdout_losses = holdout_losses.cpu() break_condition = self._save_best(epoch, holdout_losses) if break_condition or epoch > max_iter: break def _save_best (self, epoch, losses, threshold=0.1 ): updated = False for i in range (len (losses)): current = losses[i] _, best = self._snapshots[i] improvement = (best - current) / best if improvement > threshold: self._snapshots[i] = (epoch, current) updated = True self._epoch_since_last_update = 0 if updated else self._epoch_since_last_update + 1 return self._epoch_since_last_update > 5 def predict (self, inputs, batch_size=64 ): inputs = np.tile(inputs, (self._num_network, 1 , 1 )) inputs = torch.tensor(inputs, dtype=torch.float ).to(device) mean, var = self.model(inputs, return_log_var=False ) return mean.detach().cpu().numpy(), var.detach().cpu().numpy()class FakeEnv : def __init__ (self, model ): self.model = model def step (self, obs, act ): inputs = np.concatenate((obs, act), axis=-1 ) ensemble_model_means, ensemble_model_vars = self.model.predict(inputs) ensemble_model_means[:, :, 1 :] += obs ensemble_model_stds = np.sqrt(ensemble_model_vars) ensemble_samples = ensemble_model_means + np.random.normal(size=ensemble_model_means.shape) * ensemble_model_stds num_models, batch_size, _ = ensemble_model_means.shape models_to_use = np.random.choice([i for i in range (self.model._num_network)], size=batch_size) batch_inds = np.arange(0 , batch_size) samples = ensemble_samples[models_to_use, batch_inds] rewards, next_obs = samples[:, :1 ][0 ][0 ], samples[:, 1 :][0 ] return rewards, next_obs
最后,我们来实现 MBPO 算法的具体流程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 class MBPO : def __init__ (self, env, agent, fake_env, env_pool, model_pool, rollout_length, rollout_batch_size, real_ratio, num_episode ): self.env = env self.agent = agent self.fake_env = fake_env self.env_pool = env_pool self.model_pool = model_pool self.rollout_length = rollout_length self.rollout_batch_size = rollout_batch_size self.real_ratio = real_ratio self.num_episode = num_episode def rollout_model (self ): observations, _, _, _, _ = self.env_pool.sample(self.rollout_batch_size) for obs in observations: for i in range (self.rollout_length): action = self.agent.take_action(obs) reward, next_obs = self.fake_env.step(obs, action) self.model_pool.add(obs, action, reward, next_obs, False ) obs = next_obs def update_agent (self, policy_train_batch_size=64 ): env_batch_size = int (policy_train_batch_size * self.real_ratio) model_batch_size = policy_train_batch_size - env_batch_size for epoch in range (10 ): env_obs, env_action, env_reward, env_next_obs, env_done = self.env_pool.sample(env_batch_size) if self.model_pool.size() > 0 : model_obs, model_action, model_reward, model_next_obs, model_done = self.model_pool.sample(model_batch_size) obs = np.concatenate((env_obs, model_obs), axis=0 ) action = np.concatenate((env_action, model_action), axis=0 ) next_obs = np.concatenate((env_next_obs, model_next_obs), axis=0 ) reward = np.concatenate((env_reward, model_reward), axis=0 ) done = np.concatenate((env_done, model_done), axis=0 ) else : obs, action, next_obs, reward, done = env_obs, env_action, env_next_obs, env_reward, env_done transition_dict = { 'states' : obs, 'actions' : action, 'next_states' : next_obs, 'rewards' : reward, 'dones' : done } self.agent.update(transition_dict) def train_model (self ): obs, action, reward, next_obs, done = self.env_pool.return_all_samples() inputs = np.concatenate((obs, action), axis=-1 ) reward = np.array(reward) labels = np.concatenate( (np.reshape(reward, (reward.shape[0 ], -1 )), next_obs - obs), axis=-1 ) self.fake_env.model.train(inputs, labels) def explore (self ): obs, done, episode_return = self.env.reset(), False , 0 while not done: action = self.agent.take_action(obs) next_obs, reward, done, _ = self.env.step(action) self.env_pool.add(obs, action, reward, next_obs, done) obs = next_obs episode_return += reward return episode_return def train (self ): return_list = [] explore_return = self.explore() print ('episode: 1, return: %d' % explore_return) return_list.append(explore_return) for i_episode in range (self.num_episode - 1 ): obs, done, episode_return = self.env.reset(), False , 0 step = 0 while not done: if step % 50 == 0 : self.train_model() self.rollout_model() action = self.agent.take_action(obs) next_obs, reward, done, _ = self.env.step(action) self.env_pool.add(obs, action, reward, next_obs, done) obs = next_obs episode_return += reward self.update_agent() step += 1 return_list.append(episode_return) print ('episode: %d, return: %d' % (i_episode + 2 , episode_return)) return return_listclass ReplayBuffer : def __init__ (self, capacity ): self.buffer = collections.deque(maxlen=capacity) def add (self, state, action, reward, next_state, done ): self.buffer.append((state, action, reward, next_state, done)) def size (self ): return len (self.buffer) def sample (self, batch_size ): if batch_size > len (self.buffer): return self.return_all_samples() else : transitions = random.sample(self.buffer, batch_size) state, action, reward, next_state, done = zip (*transitions) return np.array(state), action, reward, np.array(next_state), done def return_all_samples (self ): all_transitions = list (self.buffer) state, action, reward, next_state, done = zip (*all_transitions) return np.array(state), action, reward, np.array(next_state), done
对于不同的环境,我们需要设置不同的参数。这里以 OpenAI Gym 中的 Pendulum-v0 环境为例,给出一组效果较为不错的参数。读者可以试着自己调节参数,观察调节后的效果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 real_ratio = 0.5 env_name = 'Pendulum-v0' env = gym.make(env_name) num_episodes = 20 actor_lr = 5e-4 critic_lr = 5e-3 alpha_lr = 1e-3 hidden_dim = 128 gamma = 0.98 tau = 0.005 buffer_size = 10000 target_entropy = -1 model_alpha = 0.01 state_dim = env.observation_space.shape[0 ] action_dim = env.action_space.shape[0 ] action_bound = env.action_space.high[0 ] rollout_batch_size = 1000 rollout_length = 1 model_pool_size = rollout_batch_size * rollout_length agent = SAC(state_dim, hidden_dim, action_dim, action_bound, actor_lr, critic_lr, alpha_lr, target_entropy, tau, gamma) model = EnsembleDynamicsModel(state_dim, action_dim, model_alpha) fake_env = FakeEnv(model) env_pool = ReplayBuffer(buffer_size) model_pool = ReplayBuffer(model_pool_size) mbpo = MBPO(env, agent, fake_env, env_pool, model_pool, rollout_length, rollout_batch_size, real_ratio, num_episodes) return_list = mbpo.train() episodes_list = list (range (len (return_list))) plt.plot(episodes_list, return_list) plt.xlabel('Episodes' ) plt.ylabel('Returns' ) plt.title('MBPO on {}' .format (env_name)) plt.show()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 episode: 1, return: -1617 episode: 2, return: -1463 episode: 3, return: -1407 episode: 4, return: -929 episode: 5, return: -860 episode: 6, return: -643 episode: 7, return: -128 episode: 8, return: -368 episode: 9, return: -118 episode: 10, return: -123 episode: 11, return: -122 episode: 12, return: -118 episode: 13, return: -119 episode: 14, return: -119 episode: 15, return: -121 episode: 16, return: -123 episode: 17, return: 0 episode: 18, return: -125 episode: 19, return: -126 episode: 20, return: -243
可以看到,相比无模型的强化学习算法,基于模型的方法 MBPO 在样本效率上要高很多。虽然这里的效果可能不如 16.3 节提到的 PETS 算法优秀,但是在许多更加复杂的环境中(如 Hopper 和 HalfCheetah),MBPO 的表现远远好于 PETS 算法。
17.4 小结
MBPO 算法是一种前沿的基于模型的强化学习算法,它提出了一个重要的概念——分支推演。在各种复杂的环境中,作者验证了 MBPO 的效果超过了之前基于模型的方法。MBPO 对于基于模型的强化学习的发展起着重要的作用,不少之后的工作都是在此基础上进行的。
除了算法的有效性,MBPO 的重要贡献还包括它给出了关于分支推演步数与模型误差、策略偏移程度之间的定量关系,进而阐明了什么时候我们可以相信并使用环境模型,什么样的环境导出的最优分支推演步数为 0,进而建议不使用环境模型。相应的理论分析在 17.5 节给出。
17.5 拓展阅读:MBPO 理论分析
17.5.1 性能提升的单调性保障
基于模型的方法往往是在环境模型中提升策略的性能,但这并不能保证在真实环境中策略性能也有所提升。因此,我们希望模型环境和真实环境中的结果的差距有一定的限制,具体可形式化为:
η ( π ) ≥ η ^ ( π ) − C \eta(\pi) \ge \hat\eta(\pi) - C
η ( π ) ≥ η ^ ( π ) − C
其中,η ( π ) \eta(\pi) η ( π ) 表示策略在真实环境中的期望回报,而η ^ ( π ) \hat\eta(\pi) η ^ ( π ) 表示策略在模型环境中的期望回报。这一公式保证了在模型环境中提高策略性能超过C C C 时,就可以在真实环境中取得策略性能的提升。
在 MBPO 中,根据泛化误差和分布偏移估计出这样一个下界:
η ( π ) ≥ η ^ ( π ) − [ 2 γ r max ( ϵ m + 2 ϵ π ) ( 1 − γ ) 2 + 4 r max ϵ π ( 1 − γ ) ] \eta(\pi) \ge \hat\eta(\pi) - \bigg[
\dfrac{2\gamma r_{\max} (\epsilon_m + 2\epsilon_\pi)}{(1-\gamma)^2} + \dfrac{4r_{\max}\epsilon_\pi}{(1-\gamma)}
\bigg]
η ( π ) ≥ η ^ ( π ) − [ ( 1 − γ ) 2 2 γ r m a x ( ϵ m + 2 ϵ π ) + ( 1 − γ ) 4 r m a x ϵ π ]
其中,ϵ m = max t E s ∼ π D , t [ D T V ( p ( s ′ , r ∣ s , a ) ∣ ∣ p θ ( s ′ , r ∣ s , a ) ) ] \displaystyle\epsilon_m = \max_t \mathbb{E}_{s\sim\pi_{D,t}}[D_{TV}(p(s',r|s,a)||p_\theta(s',r|s,a))] ϵ m = t max E s ∼ π D , t [ D T V ( p ( s ′ , r ∣ s , a ) ∣∣ p θ ( s ′ , r ∣ s , a ))] 刻画了模型泛化误差,而ϵ π = max s D T V ( π ∣ ∣ π D ) \displaystyle\epsilon_\pi = \max_s D_{TV}(\pi||\pi_D) ϵ π = s max D T V ( π ∣∣ π D ) 刻画了当前策略π \pi π 与数据收集策略π D \pi_D π D 之间的策略转移 (policy shift)。
17.5.2 模型推演长度
在上面的公式里,如果模型泛化误差很大,就可能不存在一个使得推演误差C C C 最小的正数推演步数k k k ,进而无法使用模型。因此作者提出了分支推演 (branched rollout)的办法,即从之前访问过的状态开始进行有限制的推演,从而保证模型工作时的泛化误差不要太大。
如果我们使用当前策略π t \pi_t π t 而非本轮之前的数据收集策略π D , t \pi_{D,t} π D , t 来估计模型误差(记为ϵ m ′ \epsilon_m' ϵ m ′ ),那么有:
ϵ m ′ = max t E s ∼ π t [ D T V ( p ( s ′ , r ∣ s , a ) ∣ ∣ p θ ( s ′ , r ∣ s , a ) ) ] \epsilon_m' = \max_t \mathbb{E}_{s\sim\pi_t}\Big[
D_{TV}\Big( p(s',r|s,a) || p_\theta(s', r|s, a) \Big)
\Big]
ϵ m ′ = t max E s ∼ π t [ D T V ( p ( s ′ , r ∣ s , a ) ∣∣ p θ ( s ′ , r ∣ s , a ) ) ]
并且对其进行线性近似:
ϵ m ′ ≈ ϵ m + ϵ π d ϵ m ′ d ϵ π \epsilon_m' \approx \epsilon_m + \epsilon_\pi \dfrac{\mathrm{d}\epsilon_m'}{\mathrm{d}\epsilon_\pi}
ϵ m ′ ≈ ϵ m + ϵ π d ϵ π d ϵ m ′
结合上k k k 步分支推演,我们就可以得到一个新的策略期望回报界:
η ( π ) ≥ η branch ( π ) − 2 r max [ γ k + 1 ϵ π ( 1 − γ ) 2 + γ k ϵ π ( 1 − γ ) + k 1 − γ ϵ m ′ ] \eta(\pi) \ge \eta^{\text{branch}}(\pi) - 2 r_{\max} \bigg[
\dfrac{\gamma^{k+1}\epsilon_\pi}{(1-\gamma)^2} +
\dfrac{\gamma^{k}\epsilon_\pi}{(1-\gamma)} +
\dfrac{k}{1-\gamma}\epsilon_m'
\bigg]
η ( π ) ≥ η branch ( π ) − 2 r m a x [ ( 1 − γ ) 2 γ k + 1 ϵ π + ( 1 − γ ) γ k ϵ π + 1 − γ k ϵ m ′ ]
其中,η branch ( π ) \eta^{\text{branch}}(\pi) η branch ( π ) 表示使用分支推演的方法得到的策略期望回报。通过以上公式,我们就可以得到理论最优的推演步长,即:
k ∗ = arg min k [ γ k + 1 ϵ π ( 1 − γ ) 2 + γ k ϵ π ( 1 − γ ) + k 1 − γ ϵ m ′ ] k^{*} = \argmin_k \bigg[
\dfrac{\gamma^{k+1}\epsilon_\pi}{(1-\gamma)^2} +
\dfrac{\gamma^{k}\epsilon_\pi}{(1-\gamma)} +
\dfrac{k}{1-\gamma}\epsilon_m'
\bigg]
k ∗ = k arg min [ ( 1 − γ ) 2 γ k + 1 ϵ π + ( 1 − γ ) γ k ϵ π + 1 − γ k ϵ m ′ ]
在上式中可以看到,对于γ ∈ ( 0 , 1 ) \gamma\in(0,1) γ ∈ ( 0 , 1 ) ,当推演步数变大时,γ k + 1 ϵ π ( 1 − γ ) 2 + γ k ϵ π ( 1 − γ ) \dfrac{\gamma^{k+1}\epsilon_\pi}{(1-\gamma)^2} + \dfrac{\gamma^{k}\epsilon_\pi}{(1-\gamma)} ( 1 − γ ) 2 γ k + 1 ϵ π + ( 1 − γ ) γ k ϵ π 变小,而k 1 − γ ϵ m ′ \dfrac{k}{1-\gamma}\epsilon_m' 1 − γ k ϵ m ′ 变大。更进一步,如果ϵ m ′ \epsilon_m' ϵ m ′ 足够小的话,由ϵ m ′ ≈ ϵ m + ϵ π d ϵ m ′ d ϵ π \epsilon_m' \approx \epsilon_m + \epsilon_\pi \dfrac{\mathrm{d}\epsilon_m'}{\mathrm{d}\epsilon_\pi} ϵ m ′ ≈ ϵ m + ϵ π d ϵ π d ϵ m ′ 的关系可知,如果d ϵ m ′ d ϵ π \dfrac{\mathrm{d}\epsilon_m'}{\mathrm{d}\epsilon_\pi} d ϵ π d ϵ m ′ 足够小,那么最优推演步长k k k 就是为正,此时分支推演(或者说使用基于模型的方法)就是一个有效的方法。
MBPO 论文中展示了在主流的机器人运动环境 Mojoco 的典型场景中,d ϵ m ′ d ϵ π \dfrac{\mathrm{d}\epsilon_m'}{\mathrm{d}\epsilon_\pi} d ϵ π d ϵ m ′ 的数量级非常小,大约都在[ 1 0 − 4 , 1 0 − 2 ] [10^{-4}, 10^{-2}] [ 1 0 − 4 , 1 0 − 2 ] 区间,而且可以看出它随着训练数据的增多的而不断下降,说明模型的泛化能力逐渐增强,而我们对于推演步长为正数的假设也是合理的。但要知道,并不是所有的强化学习环境都可以有如此小的d ϵ m ′ d ϵ π \dfrac{\mathrm{d}\epsilon_m'}{\mathrm{d}\epsilon_\pi} d ϵ π d ϵ m ′ 。例如在高随机性的离散状态环境中,往往环境模型的拟合精度较低,以至于d ϵ m ′ d ϵ π \dfrac{\mathrm{d}\epsilon_m'}{\mathrm{d}\epsilon_\pi} d ϵ π d ϵ m ′ 较大,此时使用基于分支推演的方法效果有限。
17.6 参考文献
[1] JANNER M, FU J, ZHANG M, et al. When to trust your model: model-based policy optimization [J]. Advances in neural information processing systems 2019, 32: 12519-12530.
18 离线强化学习
18.1 简介
在前面的学习中,我们已经对强化学习有了不少了解。无论是在线策略 (on-policy)算法还是离线策略 (off-policy)算法,都有一个共同点:智能体在训练过程中可以不断和环境交互,得到新的反馈数据。二者的区别主要在于在线策略算法会直接使用这些反馈数据,而离线策略算法会先将数据存入经验回放池中,需要时再采样。然而,在现实生活中的许多场景下,让尚未学习好的智能体和环境交互可能会导致危险发生,或是造成巨大损失。例如,在训练自动驾驶的规控智能体时,如果让智能体从零开始和真实环境进行交互,那么在训练的最初阶段,它操控的汽车无疑会横冲直撞,造成各种事故。再例如,在推荐系统中,用户的反馈往往比较滞后,统计智能体策略的回报需要很长时间。而如果策略存在问题,早期的用户体验不佳,就会导致用户流失等后果。因此,离线强化学习 (offline reinforcement learning)的目标是,在智能体不和环境交互的情况下,仅从已经收集好的确定的数据集中,通过强化学习算法得到比较好的策略。离线强化学习和在线策略算法、离线策略算法的区别如图 18-1 所示。
图18-1 离线强化学习和在线策略算法、离线策略算法的区别
18.2 批量限制 Q-learning 算法
图 18-1 中的离线强化学习和离线策略强化学习很像,都要从经验回放池中采样进行训练,并且离线策略算法的策略评估方式也多种多样。因此,研究者们最开始尝试将离线策略算法直接照搬到离线的环境下,仅仅是去掉算法中和环境交互的部分。然而,这种做法完全失败了。研究者进行了 3 个简单的实验。第一个实验,作者使用 DDPG 算法训练了一个智能体,并将智能体与环境交互的所有数据都记录下来,再用这些数据训练离线 DDPG 智能体。第二个实验,在线 DDPG 算法在训练时每次从经验回放池中采样,并用相同的数据同步训练离线 DDPG 智能体,这样两个智能体甚至连训练时用到的数据顺序都完全相同。第三个实验,在线 DDPG 算法在训练完毕后作为专家,在环境中采集大量数据,供离线 DDPG 智能体学习。这 3 个实验,即完全回放、同步训练、模仿训练的结果依次如图 18-2 所示。
让人惊讶的是,3 个实验中,离线 DDPG 智能体的表现都远远差于在线 DDPG 智能体,即便是第二个实验的同步训练都无法提高离线智能体的表现。在第三个模仿训练实验中,离线智能体面对非常优秀的数据样本却什么都没学到!针对这种情况,研究者指出,外推误差 (extrapolation error)是离线策略算法不能直接迁移到离线环境中的原因。
外推误差,是指由于当前策略可能访问到的状态动作对与从数据集中采样得到的状态动作对的分布不匹配而产生的误差。为什么在线强化学习算法没有受到外推误差的影响呢?因为对于在线强化学习,即使训练是离线策略的,智能体依然有机会通过与环境交互及时采样到新的数据,从而修正这些误差。但是在离线强化学习中,智能体无法和环境交互。因此,一般来说,离线强化学习算法要想办法尽可能地限制外推误差的大小,从而得到较好的策略。
为了减少外推误差,当前的策略需要做到只访问与数据集中相似的( s , a ) (s,a) ( s , a ) 数据。满足这一要求的策略称为批量限制策略 (batch-constrained policy)。具体来说,这样的策略在选择动作时有 3 个目标:
最小化选择的动作与数据集中数据的距离;
采取动作后能到达与离线数据集中状态相似的状态;
最大化函数Q Q Q 。
对于标准的表格 (tabular)型环境,状态和动作空间都是离散且有限的。标准的 Q-learning 更新公式可以写为:
Q ( s , a ) ← ( 1 − α ) Q ( s , a ) + α ( r + γ Q ( s ′ , arg max a ′ Q ( s ′ , a ′ ) ) ) Q(s,a) \leftarrow (1-\alpha) Q(s,a) + \alpha \bigg(r + \gamma Q\Big(s', \argmax_{a'}Q(s',a')\Big)\bigg)
Q ( s , a ) ← ( 1 − α ) Q ( s , a ) + α ( r + γ Q ( s ′ , a ′ arg max Q ( s ′ , a ′ ) ) )
这时,只需要把策略π \pi π 能选择的动作限制在数据集D \mathcal{D} D 内,就能满足上述 3 个目标的平衡,这样就得到了表格设定下的批量限制 Q-learning(batch-constrained Q-learning,BCQ)算法:
Q ( s , a ) ← ( 1 − α ) Q ( s , a ) + α ( r + γ Q ( s ′ , arg max a ′ s.t. ( s ′ , a ′ ) ∈ D Q ( s ′ , a ′ ) ) ) Q(s,a) \leftarrow (1-\alpha) Q(s,a) + \alpha \bigg(r + \gamma Q\Big(s', \argmax_{\underset{\text{s.t.}(s',a') \in\mathcal{D}}{a'}}Q(s',a')\Big)\bigg)
Q ( s , a ) ← ( 1 − α ) Q ( s , a ) + α ( r + γ Q ( s ′ , s.t. ( s ′ , a ′ ) ∈ D a ′ arg max Q ( s ′ , a ′ ) ) )
可以证明,如果数据中包含了所有可能的( s , a ) (s,a) ( s , a ) 对,按上式进行迭代可以收敛到最优的价值函数Q ∗ Q^{*} Q ∗ 。
连续状态和动作的情况要复杂一些,因为批量限制策略的目标需要被更详细地定义。例如,该如何定义两个状态动作对的距离呢?BCQ采用了一种巧妙的方法:训练一个生成模型G ω ( s ) G_\omega(s) G ω ( s ) 。对于数据集D \mathcal{D} D 和其中的状态s s s ,生成模型G ω ( s ) G_\omega(s) G ω ( s ) 能给出与D \mathcal{D} D 中数据接近的一系列动作a 1 , ⋯ , a n a_1,\cdots,a_n a 1 , ⋯ , a n 用于Q Q Q 网络的训练。更进一步,为了增加生成动作的多样性,减少生成次数,BCQ 还引入了扰动模型ξ ϕ ( s , a , Φ ) \xi_\phi(s,a,\Phi) ξ ϕ ( s , a , Φ ) 。输入( s , a ) (s,a) ( s , a ) 时,模型给出一个绝对值最大为Φ \Phi Φ 的微扰并附加在动作上。这两个模型综合起来相当于给出了一个批量限制策略π \pi π :
π ( s ) = arg max a i + ξ ϕ ( s , a i , Φ ) Q θ ( s , a i + ξ ϕ ( s , a i , Φ ) ) , { a i ∼ G ω ( s ) } i = 1 n \pi(s) = \argmax_{a_i + \xi_\phi(s,a_i,\Phi)} Q_\theta(s,a_i + \xi_\phi(s,a_i,\Phi)), \quad \{a_i \sim G_\omega(s)\}_{i=1}^n
π ( s ) = a i + ξ ϕ ( s , a i , Φ ) arg max Q θ ( s , a i + ξ ϕ ( s , a i , Φ )) , { a i ∼ G ω ( s ) } i = 1 n
其中,生成模型G ω ( s ) G_\omega(s) G ω ( s ) 用变分自动编码器 (variational auto-encoder, VAE)实现;扰动模型直接通过确定性策略梯度算法训练,目标是使Q Q Q 函数最大化:
ϕ ← arg max ϕ ∑ ( s , a ) ∈ D Q θ ( s , a + ξ ϕ ( s , a , Φ ) ) \phi \leftarrow \argmax_\phi \sum_{(s,a)\in\mathcal{D}} Q_\theta(s,a + \xi_\phi(s,a,\Phi))
ϕ ← ϕ arg max ( s , a ) ∈ D ∑ Q θ ( s , a + ξ ϕ ( s , a , Φ ))
总结起来,BCQ 算法的流程如下:
随机初始化Q Q Q 网络Q θ Q_\theta Q θ 、扰动网络ξ ϕ \xi_\phi ξ ϕ 、生成网络G ω = { E ω 1 , D ω 2 } G_\omega=\{E_{\omega_1}, D_{\omega_2}\} G ω = { E ω 1 , D ω 2 }
用θ \theta θ 初始化目标Q Q Q 网络Q θ − Q_{\theta^{-}} Q θ − ,用ϕ \phi ϕ 初始化目标扰动网络ξ ϕ ′ \xi_{\phi'} ξ ϕ ′
for 训练次数e = 1 → E e=1\rightarrow E e = 1 → E do
从数据集D \mathcal{D} D 中采样一定数量的( s , a , r , s ′ ) (s,a,r,s') ( s , a , r , s ′ )
编码器生成均值和标准差μ , σ = E ω 1 ( s , a ) \mu,\sigma=E_{\omega_1}(s,a) μ , σ = E ω 1 ( s , a )
解码器生成动作a ~ = D ω 2 ( s , z ) \tilde{a}=D_{\omega_2}(s,z) a ~ = D ω 2 ( s , z ) ,其中z ∼ N ( μ , σ ) z\sim\mathcal{N}(\mu,\sigma) z ∼ N ( μ , σ )
更新生成模型:ω ← arg min ω ∑ ( a − a ~ ) 2 + D K L ( N ( μ , σ ) ∣ ∣ N ( 0 , 1 ) ) \displaystyle \omega \leftarrow \argmin_\omega\sum(a - \tilde{a})^2 + D_{KL}(\mathcal{N}(\mu,\sigma)||\mathcal{N}(0, 1)) ω ← ω arg min ∑ ( a − a ~ ) 2 + D K L ( N ( μ , σ ) ∣∣ N ( 0 , 1 ))
从生成模型中采样n n n 个动作:{ a i ∼ G ω ( s ′ ) } i = 1 n \{a_i \sim G_{\omega}(s')\}_{i=1}^n { a i ∼ G ω ( s ′ ) } i = 1 n
对每个动作施加扰动:{ a i ← a i + ξ ϕ ( s ′ , a i , ϕ ) } i = 1 n \{a_i \leftarrow a_i + \xi_\phi(s',a_i,\phi)\}_{i=1}^n { a i ← a i + ξ ϕ ( s ′ , a i , ϕ ) } i = 1 n
计算Q Q Q 网络的目标值y = r + γ max a i Q θ − ( s ′ , a i ) \displaystyle y=r+\gamma\max_{a_i}Q_{\theta^{-}}(s',a_i) y = r + γ a i max Q θ − ( s ′ , a i )
更新Q Q Q 网络:θ ← arg min θ ∑ ( y − Q θ ( s , a ) ) 2 \displaystyle\theta\leftarrow\argmin_\theta\sum(y-Q_\theta(s,a))^2 θ ← θ arg min ∑ ( y − Q θ ( s , a ) ) 2
更新扰动网络:ϕ ← arg max ϕ ∑ Q θ ( s , a + ξ ϕ ( s , a , Φ ) ) , a ∼ G ω ( s ) \displaystyle\phi \leftarrow \argmax_\phi\sum Q_\theta(s,a+\xi_\phi(s,a,\Phi)), a\sim G_\omega(s) ϕ ← ϕ arg max ∑ Q θ ( s , a + ξ ϕ ( s , a , Φ )) , a ∼ G ω ( s )
更新目标Q Q Q 网络:θ − ← τ θ + ( 1 − τ ) θ − \theta^{-} \leftarrow \tau\theta + (1-\tau)\theta^{-} θ − ← τ θ + ( 1 − τ ) θ −
更新目标扰动网络:ϕ ′ ← τ ϕ + ( 1 − τ ) ϕ ′ \phi' \leftarrow \tau\phi + (1-\tau)\phi' ϕ ′ ← τ ϕ + ( 1 − τ ) ϕ ′
end for
除此之外,BCQ 还使用了一些实现上的小技巧。由于不是 BCQ 的重点,此处不再赘述。考虑到 VAE 不属于本书的讨论范围,并且 BCQ 的代码中有较多技巧,有兴趣的读者可以参阅 BCQ 原文,自行实现代码。此处介绍 BCQ 算法,一是因为它对离线强化学习的误差分析和实验很有启发性,二是因为它是无模型离线强化学习中限制策略集合算法中的经典方法。下面我们介绍另一类直接限制Q Q Q 函数的算法的代表:保守 Q-learning。
18.3 保守 Q-learning 算法
18.2 节已经讲到,离线强化学习面对的巨大挑战是如何减少外推误差。实验证明,外推误差主要会导致在远离数据集的点上函数Q Q Q 的过高估计,甚至常常出现Q Q Q 值向上发散的情况。因此,如果能用某种方法将算法中偏离数据集的点上的函数Q Q Q 保持在很低的值,或许能消除部分外推误差的影响,这就是保守 Q-learning (conservative Q-learning,CQL)算法的基本思想。CQL 在普通的贝尔曼方程上引入一些额外的限制项,达到了这一目标。接下来一步步介绍 CQL 算法的思路。
在普通的 Q-learning 中,Q Q Q 的更新方程可以写为:
Q ^ k + 1 ← arg min Q E ( s , a ) ∼ D [ ( Q ( s , a ) − B ^ π Q ^ k ( s , a ) ) 2 ] \hat{Q}^{k+1} \leftarrow \argmin_{Q} \mathbb{E}_{(s,a)\sim\mathcal{D}}\bigg[
\Big( Q(s,a) - \hat{\mathcal{B}}^{\pi}\hat{Q}^k(s,a) \Big)^2
\bigg]
Q ^ k + 1 ← Q arg min E ( s , a ) ∼ D [ ( Q ( s , a ) − B ^ π Q ^ k ( s , a ) ) 2 ]
其中,B ^ π \hat{\mathcal{B}}^{\pi} B ^ π 是实际计算时策略π \pi π 的贝尔曼算子。为了防止Q Q Q 值在各个状态上(尤其是不在数据集中的状态上)的过高估计,我们要对某些状态上的高Q Q Q 值进行惩罚。考虑一般情况,我们希望Q Q Q 在某个特定分布μ ( s , a ) \mu(s,a) μ ( s , a ) 上的期望值最小。在上式中,B ^ π \hat{\mathcal{B}}^{\pi} B ^ π 的计算需要用到s , a , s ′ , a ′ s,a,s',a' s , a , s ′ , a ′ ,但只有a ′ a' a ′ 是生成的,可能不在数据集中。因此,我们对数据集中的状态s s s 按策略μ \mu μ 得到的动作进行惩罚:
Q ^ k + 1 ← arg min Q β ⋅ E s ∼ D , a ∼ μ ( a ∣ s ) [ Q ( s , a ) ] + 1 2 E ( s , a ) ∼ D [ ( Q ( s , a ) − B ^ π Q ^ k ( s , a ) ) 2 ] \hat{Q}^{k+1} \leftarrow \argmin_{Q} \beta \cdot \mathbb{E}_{s\sim\mathcal{D}, a\sim\mu(a|s)}\Big[Q(s,a)\Big] + \dfrac{1}{2} \mathbb{E}_{(s,a)\sim\mathcal{D}}\bigg[
\Big( Q(s,a) - \hat{\mathcal{B}}^{\pi}\hat{Q}^k(s,a) \Big)^2
\bigg]
Q ^ k + 1 ← Q arg min β ⋅ E s ∼ D , a ∼ μ ( a ∣ s ) [ Q ( s , a ) ] + 2 1 E ( s , a ) ∼ D [ ( Q ( s , a ) − B ^ π Q ^ k ( s , a ) ) 2 ]
其中,β \beta β 是平衡因子。可以证明,上式迭代收敛给出的函数Q Q Q 在任何( s , a ) (s,a) ( s , a ) 上的值都比真实值要小。不过,如果我们放宽条件,只追求Q Q Q 在π ( a ∣ s ) \pi(a|s) π ( a ∣ s ) 上的期望值V π V^\pi V π 比真实值小的话,就可以略微放松对上式的约束。一个自然的想法是,对于符合用于生成数据集的行为策略π b \pi_b π b 的数据点,我们可以认为Q Q Q 对这些点的估值较为准确,在这些点上不必限制让Q Q Q 值很小。作为对第一项的补偿,将上式改为:
Q ^ k + 1 ← arg min Q β ⋅ ( E s ∼ D , a ∼ μ ( a ∣ s ) [ Q ( s , a ) ] − E s ∼ D , a ∼ π ^ b ( a ∣ s ) [ Q ( s , a ) ] ) + 1 2 E ( s , a ) ∼ D [ ( Q ( s , a ) − B ^ π Q ^ k ( s , a ) ) 2 ] \hat{Q}^{k+1} \leftarrow \argmin_{Q} \beta \cdot \Big(\mathbb{E}_{s\sim\mathcal{D}, a\sim\mu(a|s)}\Big[Q(s,a)\Big] - \mathbb{E}_{s\sim\mathcal{D}, a\sim\hat\pi_b(a|s)}\Big[Q(s,a)\Big]\Big) + \dfrac{1}{2} \mathbb{E}_{(s,a)\sim\mathcal{D}}\bigg[
\Big( Q(s,a) - \hat{\mathcal{B}}^{\pi}\hat{Q}^k(s,a) \Big)^2
\bigg]
Q ^ k + 1 ← Q arg min β ⋅ ( E s ∼ D , a ∼ μ ( a ∣ s ) [ Q ( s , a ) ] − E s ∼ D , a ∼ π ^ b ( a ∣ s ) [ Q ( s , a ) ] ) + 2 1 E ( s , a ) ∼ D [ ( Q ( s , a ) − B ^ π Q ^ k ( s , a ) ) 2 ]
将行为策略π b \pi_b π b 写为π ^ b \hat{\pi}_b π ^ b 是因为我们无法获知真实的行为策略,只能通过数据集中已有的数据近似得到。可以证明,当μ = π \mu=\pi μ = π 时,上式迭代收敛得到的函数Q Q Q 虽然不是在每一点上都小于真实值,但其期望是小于真实值的,即E π ( a ∣ s ) [ Q ^ π ( s , a ) ] ≤ V π ( s ) \displaystyle \mathbb{E}_{\pi(a|s)}[\hat{Q}^{\pi}(s,a)]\le V^{\pi}(s) E π ( a ∣ s ) [ Q ^ π ( s , a )] ≤ V π ( s ) 。
至此,CQL 算法已经有了理论上的保证,但仍有一个缺陷:计算的时间开销太大了。当令μ = π \mu=\pi μ = π 时,在Q Q Q 迭代的每一步,算法都要对策略π ^ k \hat\pi^k π ^ k 做完整的离线策略评估来计算上式中的arg min \argmin arg min ,再进行一次策略迭代,而离线策略评估是非常耗时的。既然π \pi π 并非与Q Q Q 独立,而是通过Q Q Q 值最大的动作衍生出来的,那么我们完全可以用使Q Q Q 取最大值的μ \mu μ 去近似π \pi π ,即:
π ≈ max μ E s ∼ D , a ∼ μ ( a ∣ s ) [ Q ( s , a ) ] \pi \approx \max_\mu \mathbb{E}_{s\sim\mathcal{D}, a\sim\mu(a|s)}[Q(s,a)]
π ≈ μ max E s ∼ D , a ∼ μ ( a ∣ s ) [ Q ( s , a )]
为了防止过拟合,再加上正则项R ( μ ) \mathcal{R}(\mu) R ( μ ) 。综合起来就得到完整的迭代方程 :
Q ^ k + 1 ← arg min Q β ⋅ ( E s ∼ D , a ∼ μ ( a ∣ s ) [ Q ( s , a ) ] − E s ∼ D , a ∼ π ^ b ( a ∣ s ) [ Q ( s , a ) ] ) + 1 2 E ( s , a ) ∼ D [ ( Q ( s , a ) − B ^ π Q ^ k ( s , a ) ) 2 ] + R ( μ ) \hat{Q}^{k+1} \leftarrow \argmin_{Q} \beta \cdot \Big(\mathbb{E}_{s\sim\mathcal{D}, a\sim\mu(a|s)}\Big[Q(s,a)\Big] - \mathbb{E}_{s\sim\mathcal{D}, a\sim\hat\pi_b(a|s)}\Big[Q(s,a)\Big]\Big) + \dfrac{1}{2} \mathbb{E}_{(s,a)\sim\mathcal{D}}\bigg[
\Big( Q(s,a) - \hat{\mathcal{B}}^{\pi}\hat{Q}^k(s,a) \Big)^2
\bigg] + \mathcal{R}(\mu)
Q ^ k + 1 ← Q arg min β ⋅ ( E s ∼ D , a ∼ μ ( a ∣ s ) [ Q ( s , a ) ] − E s ∼ D , a ∼ π ^ b ( a ∣ s ) [ Q ( s , a ) ] ) + 2 1 E ( s , a ) ∼ D [ ( Q ( s , a ) − B ^ π Q ^ k ( s , a ) ) 2 ] + R ( μ )
正则项采用和一个先验策略ρ ( a ∣ s ) \rho(a|s) ρ ( a ∣ s ) 的 KL 距离,即R ( μ ) = − D K L ( μ , ρ ) \mathcal{R}(\mu) = -D_{KL}(\mu,\rho) R ( μ ) = − D K L ( μ , ρ ) 。一般来说,取为均匀分布U ( a ) \mathcal{U}(a) U ( a ) 即可,这样可以将迭代方程化简为:
Q ^ k + 1 ← arg min Q β ⋅ E s ∼ D [ log ∑ a e Q ( s , a ) − E a ∼ π ^ b ( a ∣ s ) [ Q ( s , a ) ] ] + 1 2 E ( s , a ) ∼ D [ ( Q ( s , a ) − B ^ π Q ^ k ( s , a ) ) 2 ] \hat{Q}^{k+1} \leftarrow \argmin_{Q} \beta \cdot \mathbb{E}_{s\sim\mathcal{D}}\Big[\log\sum_a e^{Q(s,a)} - \mathbb{E}_{a\sim\hat{\pi}_b(a|s)} [Q(s,a)]\Big] + \dfrac{1}{2} \mathbb{E}_{(s,a)\sim\mathcal{D}}\bigg[
\Big( Q(s,a) - \hat{\mathcal{B}}^{\pi}\hat{Q}^k(s,a) \Big)^2
\bigg]
Q ^ k + 1 ← Q arg min β ⋅ E s ∼ D [ log a ∑ e Q ( s , a ) − E a ∼ π ^ b ( a ∣ s ) [ Q ( s , a )] ] + 2 1 E ( s , a ) ∼ D [ ( Q ( s , a ) − B ^ π Q ^ k ( s , a ) ) 2 ]
可以注意到,简化后式中已经不含有μ \mu μ ,为计算提供了很大方便。该化简需要进行一些数学推导,详细过程附在 18.6 节中,感兴趣的读者也可以先尝试自行推导。
上面给出了函数Q Q Q 的迭代方法。CQL 属于直接在函数Q Q Q 上做限制的一类方法,对策略π \pi π 没有特殊要求,因此参考文献中分别给出了基于 DQN 和 SAC 两种框架的 CQL 算法。考虑到后者应用更广泛,且参考文献中大部分的实验结果都是基于后者得出的,这里只介绍基于 SAC 的版本。对 SAC 的策略迭代和自动调整熵正则系数不熟悉的读者,可以先阅读第 14 章的相关内容。
总结起来,CQL 算法流程如下:
初始化Q Q Q 网络Q θ Q_\theta Q θ 、目标Q Q Q 网络Q θ ′ Q_{\theta'} Q θ ′ 和策略π ϕ \pi_\phi π ϕ 、熵正则系数α \alpha α
for 训练次数t = 1 → T t=1\rightarrow T t = 1 → T do
更新熵正则系数:α t ← α t − 1 − η α ∇ α E s ∼ D , a ∼ π ϕ ( a ∣ s ) [ − α t − 1 log π ϕ ( a ∣ s ) − α t − 1 H ] \displaystyle\alpha_t \leftarrow \alpha_{t-1} - \eta_\alpha \nabla_\alpha \mathbb{E}_{s\sim\mathcal{D},a\sim\pi_\phi(a|s)}[-\alpha_{t-1}\log\pi_\phi(a|s)-\alpha_{t-1}\mathcal{H}] α t ← α t − 1 − η α ∇ α E s ∼ D , a ∼ π ϕ ( a ∣ s ) [ − α t − 1 log π ϕ ( a ∣ s ) − α t − 1 H ]
更新函数Q Q Q :θ t ← θ t − 1 − η q ∇ θ ( α ⋅ E s ∼ D [ log ∑ a e Q θ ( s , a ) − E a ∼ π ^ b ( a ∣ s ) [ Q θ ( s , a ) ] ] + 1 2 E ( s , a ) ∼ D [ ( Q θ ( s , a ) − B π ϕ Q θ ( s , a ) ) 2 ] ) \displaystyle\theta_t \leftarrow \theta_{t-1} - \eta_q\nabla_\theta \bigg(\alpha\cdot\mathbb{E}_{s\sim\mathcal{D}}\Big[\log\sum_a e^{Q_\theta(s,a)} - \mathbb{E}_{a\sim\hat{\pi}_b(a|s)} [Q_\theta(s,a)]\Big] + \dfrac{1}{2} \mathbb{E}_{(s,a)\sim\mathcal{D}}\bigg[\Big( Q_\theta(s,a) - \mathcal{B}^{\pi_\phi}Q_\theta(s,a) \Big)^2\bigg] \bigg) θ t ← θ t − 1 − η q ∇ θ ( α ⋅ E s ∼ D [ log a ∑ e Q θ ( s , a ) − E a ∼ π ^ b ( a ∣ s ) [ Q θ ( s , a )] ] + 2 1 E ( s , a ) ∼ D [ ( Q θ ( s , a ) − B π ϕ Q θ ( s , a ) ) 2 ] )
更新策略:ϕ t ← ϕ t − 1 − η π ∇ ϕ E s ∼ D , a ∼ π ϕ ( a ∣ s ) [ α log π ϕ ( a ∣ s ) − Q θ ( s , a ) ] \phi_t \leftarrow \phi_{t-1} - \eta_\pi \nabla_\phi \mathbb{E}_{s\sim\mathcal{D}, a\sim\pi_\phi(a|s)}\Big[\alpha\log\pi_\phi(a|s) - Q_\theta(s,a)\Big] ϕ t ← ϕ t − 1 − η π ∇ ϕ E s ∼ D , a ∼ π ϕ ( a ∣ s ) [ α log π ϕ ( a ∣ s ) − Q θ ( s , a ) ]
end for
18.4 CQL 代码实践
下面在倒立摆环境中实现基础的 CQL 算法。该环境在前面的章节中已出现了多次,这里不再重复介绍。首先导入必要的库。
1 2 3 4 5 6 7 8 9 10 import numpy as npimport gymfrom tqdm import tqdmimport randomimport rl_utilsimport torchimport torch.nn as nnimport torch.nn.functional as Ffrom torch.distributions import Normalimport matplotlib.pyplot as plt
为了生成数据集,在倒立摆环境中从零开始训练一个在线 SAC 智能体,直到算法达到收敛效果,把训练过程中智能体采集的所有轨迹保存下来作为数据集。这样,数据集中既包含训练初期较差策略的采样,又包含训练后期较好策略的采样,是一个混合数据集。下面给出生成数据集的代码,SAC 部分直接使用 14.5 节中的代码,因此不再详细解释。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 class PolicyNetContinuous (torch.nn.Module): def __init__ (self, state_dim, hidden_dim, action_dim, action_bound ): super (PolicyNetContinuous, self).__init__() self.fc1 = torch.nn.Linear(state_dim, hidden_dim) self.fc_mu = torch.nn.Linear(hidden_dim, action_dim) self.fc_std = torch.nn.Linear(hidden_dim, action_dim) self.action_bound = action_bound def forward (self, x ): x = F.relu(self.fc1(x)) mu = self.fc_mu(x) std = F.softplus(self.fc_std(x)) dist = Normal(mu, std) normal_sample = dist.rsample() log_prob = dist.log_prob(normal_sample) action = torch.tanh(normal_sample) log_prob = log_prob - torch.log(1 - torch.tanh(action).pow (2 ) + 1e-7 ) action = action * self.action_bound return action, log_probclass QValueNetContinuous (torch.nn.Module): def __init__ (self, state_dim, hidden_dim, action_dim ): super (QValueNetContinuous, self).__init__() self.fc1 = torch.nn.Linear(state_dim + action_dim, hidden_dim) self.fc2 = torch.nn.Linear(hidden_dim, hidden_dim) self.fc_out = torch.nn.Linear(hidden_dim, 1 ) def forward (self, x, a ): cat = torch.cat([x, a], dim=1 ) x = F.relu(self.fc1(cat)) x = F.relu(self.fc2(x)) return self.fc_out(x)class SACContinuous : ''' 处理连续动作的SAC算法 ''' def __init__ (self, state_dim, hidden_dim, action_dim, action_bound, actor_lr, critic_lr, alpha_lr, target_entropy, tau, gamma, device ): self.actor = PolicyNetContinuous(state_dim, hidden_dim, action_dim, action_bound).to(device) self.critic_1 = QValueNetContinuous(state_dim, hidden_dim, action_dim).to(device) self.critic_2 = QValueNetContinuous(state_dim, hidden_dim, action_dim).to(device) self.target_critic_1 = QValueNetContinuous(state_dim, hidden_dim, action_dim).to(device) self.target_critic_2 = QValueNetContinuous(state_dim, hidden_dim, action_dim).to(device) self.target_critic_1.load_state_dict(self.critic_1.state_dict()) self.target_critic_2.load_state_dict(self.critic_2.state_dict()) self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr) self.critic_1_optimizer = torch.optim.Adam(self.critic_1.parameters(), lr=critic_lr) self.critic_2_optimizer = torch.optim.Adam(self.critic_2.parameters(), lr=critic_lr) self.log_alpha = torch.tensor(np.log(0.01 ), dtype=torch.float ) self.log_alpha.requires_grad = True self.log_alpha_optimizer = torch.optim.Adam([self.log_alpha], lr=alpha_lr) self.target_entropy = target_entropy self.gamma = gamma self.tau = tau self.device = device def take_action (self, state ): state = torch.tensor([state], dtype=torch.float ).to(self.device) action = self.actor(state)[0 ] return [action.item()] def calc_target (self, rewards, next_states, dones ): next_actions, log_prob = self.actor(next_states) entropy = -log_prob q1_value = self.target_critic_1(next_states, next_actions) q2_value = self.target_critic_2(next_states, next_actions) next_value = torch.min (q1_value, q2_value) + self.log_alpha.exp() * entropy td_target = rewards + self.gamma * next_value * (1 - dones) return td_target def soft_update (self, net, target_net ): for param_target, param in zip (target_net.parameters(), net.parameters()): param_target.data.copy_(param_target.data * (1.0 - self.tau) + param.data * self.tau) def update (self, transition_dict ): states = torch.tensor(transition_dict['states' ], dtype=torch.float ).to(self.device) actions = torch.tensor(transition_dict['actions' ], dtype=torch.float ).view(-1 , 1 ).to(self.device) rewards = torch.tensor(transition_dict['rewards' ], dtype=torch.float ).view(-1 , 1 ).to(self.device) next_states = torch.tensor(transition_dict['next_states' ], dtype=torch.float ).to(self.device) dones = torch.tensor(transition_dict['dones' ], dtype=torch.float ).view(-1 , 1 ).to(self.device) rewards = (rewards + 8.0 ) / 8.0 td_target = self.calc_target(rewards, next_states, dones) critic_1_loss = torch.mean(F.mse_loss(self.critic_1(states, actions), td_target.detach())) critic_2_loss = torch.mean(F.mse_loss(self.critic_2(states, actions), td_target.detach())) self.critic_1_optimizer.zero_grad() critic_1_loss.backward() self.critic_1_optimizer.step() self.critic_2_optimizer.zero_grad() critic_2_loss.backward() self.critic_2_optimizer.step() new_actions, log_prob = self.actor(states) entropy = -log_prob q1_value = self.critic_1(states, new_actions) q2_value = self.critic_2(states, new_actions) actor_loss = torch.mean(-self.log_alpha.exp() * entropy - torch.min (q1_value, q2_value)) self.actor_optimizer.zero_grad() actor_loss.backward() self.actor_optimizer.step() alpha_loss = torch.mean((entropy - self.target_entropy).detach() * self.log_alpha.exp()) self.log_alpha_optimizer.zero_grad() alpha_loss.backward() self.log_alpha_optimizer.step() self.soft_update(self.critic_1, self.target_critic_1) self.soft_update(self.critic_2, self.target_critic_2) env_name = 'Pendulum-v0' env = gym.make(env_name) state_dim = env.observation_space.shape[0 ] action_dim = env.action_space.shape[0 ] action_bound = env.action_space.high[0 ] random.seed(0 ) np.random.seed(0 ) env.seed(0 ) torch.manual_seed(0 ) actor_lr = 3e-4 critic_lr = 3e-3 alpha_lr = 3e-4 num_episodes = 100 hidden_dim = 128 gamma = 0.99 tau = 0.005 buffer_size = 100000 minimal_size = 1000 batch_size = 64 target_entropy = -env.action_space.shape[0 ] device = torch.device("cuda" ) if torch.cuda.is_available() else torch.device("cpu" ) replay_buffer = rl_utils.ReplayBuffer(buffer_size) agent = SACContinuous(state_dim, hidden_dim, action_dim, action_bound, actor_lr, critic_lr, alpha_lr, target_entropy, tau, gamma, device) return_list = rl_utils.train_off_policy_agent(env, agent, num_episodes, replay_buffer, minimal_size, batch_size)
1 2 3 4 5 6 7 8 9 10 Iteration 0: 100%|██████████| 10/10 [00:08<00:00, 1.23it/s, episode=10, return=-1534.655] Iteration 1: 100%|██████████| 10/10 [00:15<00:00, 1.52s/it, episode=20, return=-1085.715] Iteration 2: 100%|██████████| 10/10 [00:15<00:00, 1.53s/it, episode=30, return=-364.507] Iteration 3: 100%|██████████| 10/10 [00:15<00:00, 1.53s/it, episode=40, return=-222.485] Iteration 4: 100%|██████████| 10/10 [00:15<00:00, 1.58s/it, episode=50, return=-157.978] Iteration 5: 100%|██████████| 10/10 [00:15<00:00, 1.55s/it, episode=60, return=-166.056] Iteration 6: 100%|██████████| 10/10 [00:15<00:00, 1.55s/it, episode=70, return=-143.147] Iteration 7: 100%|██████████| 10/10 [00:15<00:00, 1.57s/it, episode=80, return=-127.939] Iteration 8: 100%|██████████| 10/10 [00:15<00:00, 1.55s/it, episode=90, return=-180.905] Iteration 9: 100%|██████████| 10/10 [00:15<00:00, 1.53s/it, episode=100, return=-171.265]
1 2 3 4 5 6 episodes_list = list (range (len (return_list))) plt.plot(episodes_list, return_list) plt.xlabel('Episodes' ) plt.ylabel('Returns' ) plt.title('SAC on {}' .format (env_name)) plt.show()
下面实现本章重点讨论的 CQL 算法,它在 SAC 的代码基础上做了修改。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 class CQL : ''' CQL算法 ''' def __init__ (self, state_dim, hidden_dim, action_dim, action_bound, actor_lr, critic_lr, alpha_lr, target_entropy, tau, gamma, device, beta, num_random ): self.actor = PolicyNetContinuous(state_dim, hidden_dim, action_dim, action_bound).to(device) self.critic_1 = QValueNetContinuous(state_dim, hidden_dim, action_dim).to(device) self.critic_2 = QValueNetContinuous(state_dim, hidden_dim, action_dim).to(device) self.target_critic_1 = QValueNetContinuous(state_dim, hidden_dim, action_dim).to(device) self.target_critic_2 = QValueNetContinuous(state_dim, hidden_dim, action_dim).to(device) self.target_critic_1.load_state_dict(self.critic_1.state_dict()) self.target_critic_2.load_state_dict(self.critic_2.state_dict()) self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr) self.critic_1_optimizer = torch.optim.Adam(self.critic_1.parameters(), lr=critic_lr) self.critic_2_optimizer = torch.optim.Adam(self.critic_2.parameters(), lr=critic_lr) self.log_alpha = torch.tensor(np.log(0.01 ), dtype=torch.float ) self.log_alpha.requires_grad = True self.log_alpha_optimizer = torch.optim.Adam([self.log_alpha], lr=alpha_lr) self.target_entropy = target_entropy self.gamma = gamma self.tau = tau self.beta = beta self.num_random = num_random def take_action (self, state ): state = torch.tensor([state], dtype=torch.float ).to(device) action = self.actor(state)[0 ] return [action.item()] def soft_update (self, net, target_net ): for param_target, param in zip (target_net.parameters(), net.parameters()): param_target.data.copy_(param_target.data * (1.0 - self.tau) + param.data * self.tau) def update (self, transition_dict ): states = torch.tensor(transition_dict['states' ], dtype=torch.float ).to(device) actions = torch.tensor(transition_dict['actions' ], dtype=torch.float ).view(-1 , 1 ).to(device) rewards = torch.tensor(transition_dict['rewards' ], dtype=torch.float ).view(-1 , 1 ).to(device) next_states = torch.tensor(transition_dict['next_states' ], dtype=torch.float ).to(device) dones = torch.tensor(transition_dict['dones' ], dtype=torch.float ).view(-1 , 1 ).to(device) rewards = (rewards + 8.0 ) / 8.0 next_actions, log_prob = self.actor(next_states) entropy = -log_prob q1_value = self.target_critic_1(next_states, next_actions) q2_value = self.target_critic_2(next_states, next_actions) next_value = torch.min (q1_value, q2_value) + self.log_alpha.exp() * entropy td_target = rewards + self.gamma * next_value * (1 - dones) critic_1_loss = torch.mean(F.mse_loss(self.critic_1(states, actions), td_target.detach())) critic_2_loss = torch.mean(F.mse_loss(self.critic_2(states, actions), td_target.detach())) batch_size = states.shape[0 ] random_unif_actions = torch.rand( [batch_size * self.num_random, actions.shape[-1 ]], dtype=torch.float ).uniform_(-1 , 1 ).to(device) random_unif_log_pi = np.log(0.5 **next_actions.shape[-1 ]) tmp_states = states.unsqueeze(1 ).repeat(1 , self.num_random, 1 ).view(-1 , states.shape[-1 ]) tmp_next_states = next_states.unsqueeze(1 ).repeat(1 , self.num_random, 1 ).view(-1 , next_states.shape[-1 ]) random_curr_actions, random_curr_log_pi = self.actor(tmp_states) random_next_actions, random_next_log_pi = self.actor(tmp_next_states) q1_unif = self.critic_1(tmp_states, random_unif_actions).view(-1 , self.num_random, 1 ) q2_unif = self.critic_2(tmp_states, random_unif_actions).view(-1 , self.num_random, 1 ) q1_curr = self.critic_1(tmp_states, random_curr_actions).view(-1 , self.num_random, 1 ) q2_curr = self.critic_2(tmp_states, random_curr_actions).view(-1 , self.num_random, 1 ) q1_next = self.critic_1(tmp_states, random_next_actions).view(-1 , self.num_random, 1 ) q2_next = self.critic_2(tmp_states, random_next_actions).view(-1 , self.num_random, 1 ) q1_cat = torch.cat([ q1_unif - random_unif_log_pi, q1_curr - random_curr_log_pi.detach().view(-1 , self.num_random, 1 ), q1_next - random_next_log_pi.detach().view(-1 , self.num_random, 1 ) ], dim=1 ) q2_cat = torch.cat([ q2_unif - random_unif_log_pi, q2_curr - random_curr_log_pi.detach().view(-1 , self.num_random, 1 ), q2_next - random_next_log_pi.detach().view(-1 , self.num_random, 1 ) ], dim=1 ) qf1_loss_1 = torch.logsumexp(q1_cat, dim=1 ).mean() qf2_loss_1 = torch.logsumexp(q2_cat, dim=1 ).mean() qf1_loss_2 = self.critic_1(states, actions).mean() qf2_loss_2 = self.critic_2(states, actions).mean() qf1_loss = critic_1_loss + self.beta * (qf1_loss_1 - qf1_loss_2) qf2_loss = critic_2_loss + self.beta * (qf2_loss_1 - qf2_loss_2) self.critic_1_optimizer.zero_grad() qf1_loss.backward(retain_graph=True ) self.critic_1_optimizer.step() self.critic_2_optimizer.zero_grad() qf2_loss.backward(retain_graph=True ) self.critic_2_optimizer.step() new_actions, log_prob = self.actor(states) entropy = -log_prob q1_value = self.critic_1(states, new_actions) q2_value = self.critic_2(states, new_actions) actor_loss = torch.mean(-self.log_alpha.exp() * entropy - torch.min (q1_value, q2_value)) self.actor_optimizer.zero_grad() actor_loss.backward() self.actor_optimizer.step() alpha_loss = torch.mean((entropy - self.target_entropy).detach() * self.log_alpha.exp()) self.log_alpha_optimizer.zero_grad() alpha_loss.backward() self.log_alpha_optimizer.step() self.soft_update(self.critic_1, self.target_critic_1) self.soft_update(self.critic_2, self.target_critic_2)
接下来设置好超参数,就可以开始训练了。最后再绘图看一下算法的表现。因为不能通过与环境交互来获得新的数据,离线算法最终的效果和数据集有很大关系,并且波动会比较大。通常来说,调参后数据集中的样本质量越高,算法的表现就越好。感兴趣的读者可以使用其他方式生成数据集,并观察算法效果的变化。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 random.seed(0 ) np.random.seed(0 ) env.seed(0 ) torch.manual_seed(0 ) beta = 5.0 num_random = 5 num_epochs = 100 num_trains_per_epoch = 500 agent = CQL(state_dim, hidden_dim, action_dim, action_bound, actor_lr, critic_lr, alpha_lr, target_entropy, tau, gamma, device, beta, num_random) return_list = []for i in range (10 ): with tqdm(total=int (num_epochs / 10 ), desc='Iteration %d' % i) as pbar: for i_epoch in range (int (num_epochs / 10 )): epoch_return = 0 state = env.reset() done = False while not done: action = agent.take_action(state) next_state, reward, done, _ = env.step(action) state = next_state epoch_return += reward return_list.append(epoch_return) for _ in range (num_trains_per_epoch): b_s, b_a, b_r, b_ns, b_d = replay_buffer.sample(batch_size) transition_dict = { 'states' : b_s, 'actions' : b_a, 'next_states' : b_ns, 'rewards' : b_r, 'dones' : b_d } agent.update(transition_dict) if (i_epoch + 1 ) % 10 == 0 : pbar.set_postfix({ 'epoch' : '%d' % (num_epochs / 10 * i + i_epoch + 1 ), 'return' : '%.3f' % np.mean(return_list[-10 :]) }) pbar.update(1 )
1 2 3 4 5 6 7 8 9 10 Iteration 0: 100%|██████████| 10/10 [01:26<00:00, 8.62s/it, epoch=10, return=-941.721] Iteration 1: 100%|██████████| 10/10 [01:27<00:00, 8.72s/it, epoch=20, return=-432.056] Iteration 2: 100%|██████████| 10/10 [01:26<00:00, 8.68s/it, epoch=30, return=-810.899] Iteration 3: 100%|██████████| 10/10 [01:27<00:00, 8.70s/it, epoch=40, return=-636.281] Iteration 4: 100%|██████████| 10/10 [01:26<00:00, 8.64s/it, epoch=50, return=-224.978] Iteration 5: 100%|██████████| 10/10 [01:27<00:00, 8.79s/it, epoch=60, return=-298.303] Iteration 6: 100%|██████████| 10/10 [01:30<00:00, 9.05s/it, epoch=70, return=-210.535] Iteration 7: 100%|██████████| 10/10 [01:29<00:00, 8.99s/it, epoch=80, return=-209.631] Iteration 8: 100%|██████████| 10/10 [01:29<00:00, 8.98s/it, epoch=90, return=-213.836] Iteration 9: 100%|██████████| 10/10 [01:33<00:00, 9.36s/it, epoch=100, return=-206.435]
1 2 3 4 5 6 7 8 9 10 11 12 13 epochs_list = list (range (len (return_list))) plt.plot(epochs_list, return_list) plt.xlabel('Epochs' ) plt.ylabel('Returns' ) plt.title('CQL on {}' .format (env_name)) plt.show() mv_return = rl_utils.moving_average(return_list, 9 ) plt.plot(episodes_list, mv_return) plt.xlabel('Episodes' ) plt.ylabel('Returns' ) plt.title('CQL on {}' .format (env_name)) plt.show()
18.5 总结
本章介绍了离线强化学习的基本概念和两个与模型无关的离线强化学习算法——BCQ 和 CQL,并讲解了 CQL 的代码。事实上,离线强化学习还有一类基于模型的方法,如 model-based offline reinforcement learning (MOReL)和 model-based offline policy optimization(MOPO),本章由于篇幅原因不再介绍。这一类算法的思路基本是通过模型生成更多数据,同时通过衡量模型预测的不确定性来对生成的偏离数据集的数据进行惩罚,感兴趣的读者可以自行查阅相关资料。
离线强化学习的另一大难点是算法通常对超参数极为敏感,非常难调参。并且在实际复杂场景中通常不能像在模拟器中那样,每训练几轮就在环境中评估策略好坏,如何确定何时停止算法也是离线强化学习在实际应用中面临的一大挑战。此外,离线强化学习在现实场景中的落地还需要关注离散策略评估和选择、数据收集策略的保守性和数据缺失性等现实问题。不过无论如何,离线强化学习和模仿学习都是为了解决在现实中训练智能体的困难而提出的,也都是强化学习真正落地的重要途径。
18.6 扩展阅读
这里对 CQL 算法中R ( μ ) = − D K L ( μ , U ( a ) ) \mathcal{R}(\mu) = -D_{KL}(\mu,\mathcal{U}(a)) R ( μ ) = − D K L ( μ , U ( a )) 的情况给出详细推导。对于一般的变量x ∈ D x\in\mathcal{D} x ∈ D 及其概率密度函数μ ( x ) \mu(x) μ ( x ) ,首先有归一化条件:
∫ D μ ( x ) d x = 1 \int_{\mathcal{D}}\mu(x) \mathrm{d}x = 1
∫ D μ ( x ) d x = 1
把 KL 散度展开,由于U ( x ) = 1 ∣ D ∣ \mathcal{U}(x) = \dfrac{1}{|\mathcal{D}|} U ( x ) = ∣ D ∣ 1 是常数,因此得到
D K L ( μ ( x ) , Unif ( x ) ) = ∫ D μ ( x ) log μ ( x ) U ( x ) d x = ∫ D μ ( x ) log μ ( x ) d x − ∫ D μ ( x ) log U ( x ) d x = ∫ D μ ( x ) log μ ( x ) d x − log 1 ∣ D ∣ ∫ D μ ( x ) d x = ∫ D μ ( x ) log μ ( x ) d x + log ∣ D ∣ \begin{aligned}
D_{KL}(\mu(x), \text{Unif}(x)) &= \int_{\mathcal{D}}\mu(x)\log\dfrac{\mu(x)}{\mathcal{U}(x)} \mathrm{d}x\\
&= \int_{\mathcal{D}}\mu(x)\log\mu(x) \mathrm{d}x - \int_{\mathcal{D}}\mu(x)\log\mathcal{U}(x) \mathrm{d}x\\
&= \int_{\mathcal{D}}\mu(x)\log\mu(x) \mathrm{d}x - \log\dfrac{1}{|\mathcal{D}|}\int_{\mathcal{D}}\mu(x) \mathrm{d}x\\
&= \int_{\mathcal{D}}\mu(x)\log\mu(x) \mathrm{d}x + \log |\mathcal{D}|
\end{aligned}
D K L ( μ ( x ) , Unif ( x )) = ∫ D μ ( x ) log U ( x ) μ ( x ) d x = ∫ D μ ( x ) log μ ( x ) d x − ∫ D μ ( x ) log U ( x ) d x = ∫ D μ ( x ) log μ ( x ) d x − log ∣ D ∣ 1 ∫ D μ ( x ) d x = ∫ D μ ( x ) log μ ( x ) d x + log ∣ D ∣
回到 18.3 节,可以发现,在原迭代方程中,含有μ \mu μ 的只有max μ E s ∼ D , a ∼ μ ( a ∣ s ) [ Q ( s , a ) ] \displaystyle\max_\mu\mathbb{E}_{s\sim\mathcal{D},a\sim\mu(a|s)}[Q(s,a)] μ max E s ∼ D , a ∼ μ ( a ∣ s ) [ Q ( s , a )] 和R ( μ ) \mathcal{R}(\mu) R ( μ ) 两项。在分布μ ( a ∣ s ) \mu(a|s) μ ( a ∣ s ) 的条件下,状态s s s 是给定的,因此第一项中s s s 的采样和μ \mu μ 无关,第一项可以抽象为max μ E x ∼ μ ( x ) [ f ( x ) ] \displaystyle\max_\mu\mathbb{E}_{x\sim\mu(x)}[f(x)] μ max E x ∼ μ ( x ) [ f ( x )] 。此外,概率密度函数应恒大于等于零,即μ ( x ) ≥ 0 \mu(x)\ge 0 μ ( x ) ≥ 0 。最后舍去同样和μ \mu μ 无关的常数log ∣ D ∣ \log|\mathcal{D}| log ∣ D ∣ 。综合起来,我们要求解如下的优化问题:
max μ ∫ D μ ( x ) f ( x ) − μ ( x ) log μ ( x ) d x , s.t. ∫ D μ ( x ) d x = 1 , μ ( x ) ≥ 0 \max_\mu \int_{\mathcal{D}}\mu(x) f(x) - \mu(x)\log\mu(x)\mathrm{d}x, \quad \text{s.t.}\int_{\mathcal{D}}\mu(x)\mathrm{d}x=1,\mu(x)\ge0
μ max ∫ D μ ( x ) f ( x ) − μ ( x ) log μ ( x ) d x , s.t. ∫ D μ ( x ) d x = 1 , μ ( x ) ≥ 0
这一问题的求解要用到变分法。对于等式约束和不等式约束,引入拉格朗日乘数λ \lambda λ 和松弛函数κ ( x ) 2 = μ ( x ) − 0 \kappa(x)^2=\mu(x)-0 κ ( x ) 2 = μ ( x ) − 0 ,得到相应的无约束优化问题:
max μ J ( μ ) = max μ ∫ D F ( x , μ , μ ′ ) d x = ∫ D f ( x ) μ ( x ) − μ ( x ) log μ ( x ) + λ μ ( x ) d x \max_\mu J(\mu) = \max_\mu \int_{\mathcal{D}} F(x,\mu,\mu')\mathrm{d}x = \int_{\mathcal{D}}f(x)\mu(x) - \mu(x)\log\mu(x)+\lambda\mu(x)\mathrm{d}x
μ max J ( μ ) = μ max ∫ D F ( x , μ , μ ′ ) d x = ∫ D f ( x ) μ ( x ) − μ ( x ) log μ ( x ) + λ μ ( x ) d x
其中,μ ′ \mu' μ ′ 是d μ d x \dfrac{\mathrm{d}\mu}{\mathrm{d}x} d x d μ 的简写。可以发现,F ( x , μ , μ ′ ) F(x,\mu,\mu') F ( x , μ , μ ′ ) 事实上与μ ′ \mu' μ ′ 无关,可以写为F ( x , μ ) F(x,\mu) F ( x , μ ) 。代入μ ( x ) = κ ( x ) 2 \mu(x)=\kappa(x)^2 μ ( x ) = κ ( x ) 2 ,得到
J ( κ ) = ∫ D F ( x , κ 2 ) d x J(\kappa) = \int_{\mathcal{D}}F(x,\kappa^2)\mathrm{d}x
J ( κ ) = ∫ D F ( x , κ 2 ) d x
写出欧拉-拉格朗日方程∂ F ∂ κ − d d x ∂ F ∂ κ ′ = 0 \dfrac{\partial F}{\partial \kappa} - \dfrac{\mathrm{d}}{\mathrm{d}x}\dfrac{\partial F}{\partial \kappa'}=0 ∂ κ ∂ F − d x d ∂ κ ′ ∂ F = 0 ,分别计算
∂ F ∂ κ = ∂ F ∂ μ ∂ μ ∂ κ + ∂ F ∂ μ ′ ∂ μ ′ ∂ κ = 2 κ ∂ F ∂ μ + 2 κ ′ ∂ F ∂ μ ′ = 2 κ ∂ F ∂ μ \dfrac{\partial F}{\partial \kappa} = \dfrac{\partial F}{\partial \mu} \dfrac{\partial \mu}{\partial \kappa} + \dfrac{\partial F}{\partial \mu'}\dfrac{\partial \mu'}{\partial \kappa} = 2\kappa\dfrac{\partial F}{\partial \mu} + 2\kappa'\dfrac{\partial F}{\partial \mu'} = 2\kappa\dfrac{\partial F}{\partial \mu}
∂ κ ∂ F = ∂ μ ∂ F ∂ κ ∂ μ + ∂ μ ′ ∂ F ∂ κ ∂ μ ′ = 2 κ ∂ μ ∂ F + 2 κ ′ ∂ μ ′ ∂ F = 2 κ ∂ μ ∂ F
由于第二项F F F 和κ ′ \kappa' κ ′ 无关,直接等于0 0 0 。最终拉格朗日方程简化为:
2 κ ∂ F ∂ μ = 0 2\kappa\dfrac{\partial F}{\partial \mu} = 0
2 κ ∂ μ ∂ F = 0
两项的乘积等于零,因此在每一点上,要么κ ( x ) = 0 \kappa(x)=0 κ ( x ) = 0 ,即μ ( x ) = κ ( x ) 2 = 0 \mu(x)=\kappa(x)^2=0 μ ( x ) = κ ( x ) 2 = 0 ,要么μ ( x ) \mu(x) μ ( x ) 满足∂ F ∂ μ = 0 \dfrac{\partial F}{\partial \mu}=0 ∂ μ ∂ F = 0 。先来解后面的方程,直接计算得到
∂ F ∂ μ = f ( x ) + λ − log μ ( x ) + 1 = 0 ⇒ μ ( x ) = e λ + 1 e f ( x ) \dfrac{\partial F}{\partial \mu} = f(x) + \lambda - \log\mu(x) + 1 = 0 \quad\Rightarrow\quad \mu(x)=e^{\lambda + 1}e^{f(x)}
∂ μ ∂ F = f ( x ) + λ − log μ ( x ) + 1 = 0 ⇒ μ ( x ) = e λ + 1 e f ( x )
最终的解μ ( x ) \mu(x) μ ( x ) 应是μ ( x ) = κ ( x ) 2 = 0 \mu(x)=\kappa(x)^2=0 μ ( x ) = κ ( x ) 2 = 0 和μ ( x ) = e λ + 1 e f ( x ) \mu(x)=e^{\lambda + 1}e^{f(x)} μ ( x ) = e λ + 1 e f ( x ) 两者的分段组合。由于指数函数必定大于零,为了使目标泛函取到最大值,应当全部取μ ( x ) = e λ + 1 e f ( x ) \mu(x)=e^{\lambda + 1}e^{f(x)} μ ( x ) = e λ + 1 e f ( x ) 的部分。代回归一化条件,就得到原问题的最优解:
μ ∗ ( x ) = 1 Z e f ( x ) \mu^{*}(x) = \dfrac{1}{Z}e^{f(x)}
μ ∗ ( x ) = Z 1 e f ( x )
其中,Z = ∫ D e f ( x ) d x \displaystyle Z = \int_{\mathcal{D}}e^{f(x)}\mathrm{d}x Z = ∫ D e f ( x ) d x 是归一化系数。此时,优化问题取到最大值,为
J ∗ = ∫ D μ ∗ ( x ) f ( x ) − μ ∗ ( x ) log μ ∗ ( x ) d x = ∫ D 1 Z e f ( x ) f ( x ) − 1 Z e f ( x ) ( f ( x ) − log Z ) d x = log Z Z ∫ D e f ( x ) d x = log Z = log ∫ D e f ( x ) d x \begin{aligned}
J^{*} &= \int_{\mathcal{D}}\mu^{*}(x)f(x) - \mu^{*}(x)\log\mu^{*}(x) \mathrm{d}x\\
&= \int_{\mathcal{D}}\dfrac{1}{Z}e^{f(x)}f(x) - \dfrac{1}{Z}e^{f(x)}(f(x) - \log Z) \mathrm{d}x\\
&= \dfrac{\log Z}{Z}\int_{\mathcal{D}}e^{f(x)}\mathrm{d}x\\
&= \log Z\\
&= \log\int_{\mathcal{D}}e^{f(x)}\mathrm{d}x
\end{aligned}
J ∗ = ∫ D μ ∗ ( x ) f ( x ) − μ ∗ ( x ) log μ ∗ ( x ) d x = ∫ D Z 1 e f ( x ) f ( x ) − Z 1 e f ( x ) ( f ( x ) − log Z ) d x = Z log Z ∫ D e f ( x ) d x = log Z = log ∫ D e f ( x ) d x
对照原迭代方程,将f ( x ) f(x) f ( x ) 改为f ( a ) = E s ∼ D [ Q ( s , a ) ] f(a) = \mathbb{E}_{s\sim\mathcal{D}}[Q(s,a)] f ( a ) = E s ∼ D [ Q ( s , a )] ,积分改为在动作空间上求和,上式就变为:
J ∗ = log ∑ a exp ( E s ∼ D [ Q ( s , a ) ] ) J^{*} = \log\sum_a \operatorname{exp}(\mathbb{E}_{s\sim\mathcal{D}}[Q(s,a)])
J ∗ = log a ∑ exp ( E s ∼ D [ Q ( s , a )])
此处的期望是对s s s 而言的,与a a a 无关,因此可以把期望移到最前面,得到
J ∗ = E s ∼ D [ log ∑ a exp ( Q ( s , a ) ) ] J^{*} = \mathbb{E}_{s\sim\mathcal{D}}\bigg[\log\sum_a\operatorname{exp}(Q(s,a))\bigg]
J ∗ = E s ∼ D [ log a ∑ exp ( Q ( s , a )) ]
至此,式中已经不含μ \mu μ ,完成化简。
18.7 参考文献
[1] LEVINE S, KUMAR A, TUCKER. G, et al. Offline reinforcement learning: tutorial, review, and perspectives on open problems [J]. 2020.
[2] FUJIMOTO S, MEGER D, PRECUP D. Off-policy deep reinforcement learning without exploration [C] // International conference on machine learning, PMLR, 2019.
[3] KINGMA D P, WELLING M. Auto-encoding variational bayes [C] // International conference on learning representations, ICLR, 2014.
[4] KUMAR A, ZHOU A, TUCKER G, et al. Conservative q-learning for offline reinforcement learning [J]. NeurIPS, 2020.
[5] KIDAMBL R, RAJESWARAN A, NETRAPALLI P, et al. MOReL: Model-based offline reinforcement learning [J]. Advances in neural information processing systems, 2020: 33.
[6] YU T, THOMAS G, YU L, et al. MOPO: Model-based offline policy optimization [J]. Advances in neural information processing systems, 2020: 33.
19 目标导向的强化学习
19.1 简介
前文已经学习了 PPO、SAC 等经典的深度强化学习算法,大部分算法都能在各自的任务中取得比较好的效果,但是它们都局限在单个任务上,换句话说,对于训练完的算法,在使用时它们都只能完成一个特定的任务。如果面对较为复杂的复合任务,之前的强化学习算法往往不容易训练出有效的策略。本章将介绍目标导向的强化学习 (goal-oriented reinforcement learning,GoRL)以及该类别下的一种经典算法 HER。GoRL 可以学习一个策略,使其可以在不同的目标 (goal)作为条件下奏效,以此来解决较为复杂的决策任务。
19.2 问题定义
在介绍概念之前,先介绍一个目标导向的强化学习的实际场景。例如,策略π \pi π 要操控机械臂抓取桌子上的一个物体。值得注意的是,每一次任务开始,物体的位置可能是不同的,也就是说,智能体需要完成一系列相似并不同的任务。在使用传统的强化学习算法时,采用单一策略只能抓取同一个位置的物体。对于不同的目标位置,要训练多个策略。想象一下,在悬崖漫步环境中,若目标位置变成了右上角,便只能重新训练一个策略。同一个策略无法完成一系列不同的目标。
接下来讨论 GoRL 的数学形式。有别于一般的强化学习算法中定义的马尔可夫决策过程,在目标导向的强化学习中,使用一个扩充过的元组( S , A , P , r g , G , ϕ ) (\mathcal{S,A},P,r_g,\mathcal{G},\phi) ( S , A , P , r g , G , ϕ ) 来定义 MDP,其中,S \mathcal{S} S 是状态空间,A \mathcal{A} A 是动作空间,P P P 是状态转移函数,G \mathcal{G} G 是目标空间,ϕ \phi ϕ 是一个将状态s s s 从状态空间映射为目标空间内的一个目标g g g 的函数,r g r_g r g 是奖励函数,与目标g g g 有关。接下来详细介绍目标导向的强化学习中与一般强化学习不同的概念。
首先是补充的目标空间G \mathcal{G} G 和目标g g g 。在目标导向的强化学习中,任务是由目标定义的,并且目标本身是和状态s s s 相关的,可以将一个状态s s s 使用映射函数ϕ \phi ϕ 映射为目标ϕ ( s ) ∈ G \phi(s)\in\mathcal{G} ϕ ( s ) ∈ G 。继续使用之前机械臂抓取物体的任务作为例子:状态s s s 中包含了机械臂的力矩、物体的位置等信息。因为任务是抓取物体,所以规定目标g g g 是物体的位置,此时映射函数ϕ \phi ϕ 相当于一个从状态s s s 中提取物体位置的函数。
然后介绍奖励函数,奖励函数不仅与状态s s s 和动作a a a 相关,在目标导向强化学习中,还与设定的目标相关,以下是其中一种常见的形式:
r g ( s t , a t , s t + 1 ) = { 0 , ∣ ∣ ϕ ( s t + 1 ) − g ∣ ∣ 2 ≤ δ g − 1 , otherwise r_g(s_t,a_t,s_{t+1}) = \begin{cases}
0, &||\phi(s_{t+1}) - g||_2 \le \delta_g \\
-1, &\text{otherwise}
\end{cases}
r g ( s t , a t , s t + 1 ) = { 0 , − 1 , ∣∣ ϕ ( s t + 1 ) − g ∣ ∣ 2 ≤ δ g otherwise
其中,δ g \delta_g δ g 是一个比较小的值,表示到达目标附近就不会受到− 1 -1 − 1 的惩罚。在目标导向强化学习中,由于对于不同的目标,奖励函数是不同的,因此状态价值函数V ( s , g ) V(s,g) V ( s , g ) 也是基于目标的,动作状态价值函数Q ( s , a , g ) Q(s,a,g) Q ( s , a , g ) 同理。接下来介绍目标导向的强化学习的优化目标。定义ν 0 \nu_0 ν 0 为环境中初始状态s t s_t s t 与目标g g g 的联合分布,那么 GoRL 的目标为优化策略π ( a ∣ s , g ) \pi(a|s,g) π ( a ∣ s , g ) ,使以下目标函数最大化:
E ( s 0 , g ) ∼ ν 0 [ V π ( s 0 , g ) ] \mathbb{E}_{(s_0,g)\sim\nu_0} [V^{\pi}(s_0, g)]
E ( s 0 , g ) ∼ ν 0 [ V π ( s 0 , g )]
19.3 HER 算法
根据 19.2 节的定义,可以发现目标导向的强化学习的奖励往往是非常稀疏的。由于智能体在训练初期难以完成目标而只能得到− 1 -1 − 1 的奖励,从而使整个算法的训练速度较慢。那么,有没有一种方法能有效地利用这些“失败”的经验呢?从这个角度出发,事后经验回放 (hindsight experience replay,HER)算法于 2017 年神经信息处理系统 (Neural Information Processing Systems,NeurIPS)大会中被提出,成为 GoRL 的一大经典方法。
假设现在使用策略π \pi π 在环境中以g g g 为目标进行探索,得到了这么一条轨迹:s 1 , s 2 , ⋯ , s T s_1,s_2,\cdots,s_T s 1 , s 2 , ⋯ , s T ,并且g ≠ s 1 , s 2 , ⋯ , s T g\ne s_1,s_2,\cdots,s_T g = s 1 , s 2 , ⋯ , s T 。这意味着这整一条轨迹上,我们得到的奖励值都是− 1 -1 − 1 ,这对我们的训练起到的帮助很小。那么,如果我们换一个目标g ′ g' g ′ 来重新审视整条轨迹呢?换句话说,虽然并没有达到目标g g g ,但是策略在探索的过程中,完成了s 1 , s 2 , ⋯ , s T s_1,s_2,\cdots,s_T s 1 , s 2 , ⋯ , s T 等对应的目标,即完成了ϕ ( s 1 ) , ϕ ( s 2 ) , ⋯ , ϕ ( s T ) \phi(s_1),\phi(s_2),\cdots,\phi(s_T) ϕ ( s 1 ) , ϕ ( s 2 ) , ⋯ , ϕ ( s T ) 等目标。如果用这些目标来将原先的目标g g g 替换成新的目标g ′ g' g ′ ,重新计算轨迹中的奖励值,就能使策略从失败的经验中得到对训练有用的信息。
下面来看看具体的算法流程。值得注意的是,这里的策略优化算法可以选择任意合适的算法,比如 DQN、DDPG 等。
初始化策略π \pi π 的参数θ \theta θ ,初始化经验回放池R R R
For 序列 e = 0 → E e=0\rightarrow E e = 0 → E do
根据环境给予的目标g g g 和初始状态s 0 s_0 s 0 ,使用π \pi π 在环境中采样得到轨迹{ s 0 , a 0 , r 0 ⋯ , s T , a T , r T , s T + 1 } \{s_0,a_0,r_0\cdots,s_T,a_T,r_T,s_{T+1}\} { s 0 , a 0 , r 0 ⋯ , s T , a T , r T , s T + 1 } ,将其以( s , a , r , s ′ , g ) (s,a,r,s',g) ( s , a , r , s ′ , g ) 的形式存入R R R 中
从R R R 中采样N N N 个( s , a , r , s ′ , g ) (s,a,r,s',g) ( s , a , r , s ′ , g ) 元组
对于这些元组,选择一个状态s ′ ′ s'' s ′′ ,将其映射为新的目标g ′ = ϕ ( s ′ ′ ) g'=\phi(s'') g ′ = ϕ ( s ′′ ) 并计算新的奖励值r ′ = r g ′ ( s , a , s ′ ) r'=r_{g'}(s,a,s') r ′ = r g ′ ( s , a , s ′ ) ,然后用新的数据( s , a , r ′ , s ′ , g ′ ) (s,a,r',s',g') ( s , a , r ′ , s ′ , g ′ ) 替换原先的元组
使用这些新元组,对策略π \pi π 进行训练
End for
对于算法中状态s ′ ′ s'' s ′′ 的选择,HER 提出了 3 种不同的方案。
future: 选择与被改写的元组{ s , a , s ′ , g } \{s,a,s',g\} { s , a , s ′ , g } 处于同一个轨迹并在时间上处于s s s 之后的某个状态作为s ′ ′ s'' s ′′ 。
episode: 选择与被改写的元组{ s , a , s ′ , g } \{s,a,s',g\} { s , a , s ′ , g } 处于同一个轨迹的某个状态作为s ′ ′ s'' s ′′ 。
random: 选择经验回放池中的某个状态作为s ′ ′ s'' s ′′ 。
在 HER 的实验中,future 方案给出了最好的效果,该方案也最直观。因此在代码实现中用的是 future 方案。
19.4 HER 代码实践
接下来看看如何实现 HER 算法。首先定义一个简单二维平面上的环境。在一个二维网格世界上,每个维度的位置范围是[ 0 , 5 ] [0,5] [ 0 , 5 ] ,在每一个序列的初始,智能体都处于( 0 , 0 ) (0,0) ( 0 , 0 ) 的位置,环境将自动从3.5 ≤ x , y ≤ 4.5 3.5 \le x,y \le 4.5 3.5 ≤ x , y ≤ 4.5 的矩形区域内生成一个目标。每个时刻智能体可以选择纵向和横向分别移动[ − 1 , 1 ] [-1,1] [ − 1 , 1 ] 作为这一时刻的动作。当智能体距离目标足够近的时候,它将收到0 0 0 的奖励并结束任务,否则奖励为− 1 -1 − 1 。每一条序列的最大长度为 50 50 50 。环境示意图如图 19-1 所示。
图19-1 环境示意图
使用 Python 实现这个环境。导入一些需要用到的包,并且用代码来定义该环境。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 import torchimport torch.nn.functional as Fimport numpy as npimport randomfrom tqdm import tqdmimport collectionsimport matplotlib.pyplot as pltclass WorldEnv : def __init__ (self ): self.distance_threshold = 0.15 self.action_bound = 1 def reset (self ): self.goal = np.array([4 + random.uniform(-0.5 , 0.5 ), 4 + random.uniform(-0.5 , 0.5 )]) self.state = np.array([0 , 0 ]) self.count = 0 return np.hstack((self.state, self.goal)) def step (self, action ): action = np.clip(action, -self.action_bound, self.action_bound) x = max (0 , min (5 , self.state[0 ] + action[0 ])) y = max (0 , min (5 , self.state[1 ] + action[1 ])) self.state = np.array([x, y]) self.count += 1 dis = np.sqrt(np.sum (np.square(self.state - self.goal))) reward = -1.0 if dis > self.distance_threshold else 0 if dis <= self.distance_threshold or self.count == 50 : done = True else : done = False return np.hstack((self.state, self.goal)), reward, done
接下来实现 DDPG 算法中用到的与 Actor 网络和 Critic 网络的网络结构相关的代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 class PolicyNet (torch.nn.Module): def __init__ (self, state_dim, hidden_dim, action_dim, action_bound ): super (PolicyNet, self).__init__() self.fc1 = torch.nn.Linear(state_dim, hidden_dim) self.fc2 = torch.nn.Linear(hidden_dim, hidden_dim) self.fc3 = torch.nn.Linear(hidden_dim, action_dim) self.action_bound = action_bound def forward (self, x ): x = F.relu(self.fc2(F.relu(self.fc1(x)))) return torch.tanh(self.fc3(x)) * self.action_boundclass QValueNet (torch.nn.Module): def __init__ (self, state_dim, hidden_dim, action_dim ): super (QValueNet, self).__init__() self.fc1 = torch.nn.Linear(state_dim + action_dim, hidden_dim) self.fc2 = torch.nn.Linear(hidden_dim, hidden_dim) self.fc3 = torch.nn.Linear(hidden_dim, 1 ) def forward (self, x, a ): cat = torch.cat([x, a], dim=1 ) x = F.relu(self.fc2(F.relu(self.fc1(cat)))) return self.fc3(x)
在定义好 Actor 和 Critic 的网络结构之后,来看一下 DDPG 算法的代码。这部分代码和 13.3 节中的代码基本一致,主要区别在于 13.3 节中的 DDPG 算法是在倒立摆环境中运行的,动作只有 1 维,而这里的环境中动作有 2 维,导致一小部分代码不同。读者可以先思考一下此时应该修改哪一部分代码,然后自行对比,就能找到不同之处。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 class DDPG : ''' DDPG算法 ''' def __init__ (self, state_dim, hidden_dim, action_dim, action_bound, actor_lr, critic_lr, sigma, tau, gamma, device ): self.action_dim = action_dim self.actor = PolicyNet(state_dim, hidden_dim, action_dim, action_bound).to(device) self.critic = QValueNet(state_dim, hidden_dim, action_dim).to(device) self.target_actor = PolicyNet(state_dim, hidden_dim, action_dim, action_bound).to(device) self.target_critic = QValueNet(state_dim, hidden_dim, action_dim).to(device) self.target_critic.load_state_dict(self.critic.state_dict()) self.target_actor.load_state_dict(self.actor.state_dict()) self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr = actor_lr) self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr = critic_lr) self.gamma = gamma self.sigma = sigma self.tau = tau self.action_bound = action_bound self.device = device def take_action (self, state ): state = torch.tensor([state], dtype=torch.float ).to(self.device) action = self.actor(state).detach().cpu().numpy()[0 ] action = action + self.sigma * np.random.randn(self.action_dim) return action def soft_update (self, net, target_net ): for param_target, param in zip (target_net.parameters(), net.parameters()): param_target.data.copy_(param_target.data * (1.0 - self.tau) + param.data * self.tau) def update (self, transition_dict ): states = torch.tensor(transition_dict['states' ], dtype=torch.float ).to(self.device) actions = torch.tensor(transition_dict['actions' ], dtype=torch.float ).to(self.device) rewards = torch.tensor(transition_dict['rewards' ], dtype=torch.float ).view(-1 , 1 ).to(self.device) next_states = torch.tensor(transition_dict['next_states' ], dtype=torch.float ).to(self.device) dones = torch.tensor(transition_dict['dones' ], dtype=torch.float ).view(-1 , 1 ).to(self.device) next_q_values = self.target_critic(next_states, self.target_actor(next_states)) q_targets = rewards + self.gamma * next_q_values * (1 - dones) critic_loss = torch.mean(F.mse_loss(self.critic(states, actions), q_targets)) self.critic_optimizer.zero_grad() critic_loss.backward() self.critic_optimizer.step() actor_loss = -torch.mean(self.critic(states, self.actor(states))) self.actor_optimizer.zero_grad() actor_loss.backward() self.actor_optimizer.step() self.soft_update(self.actor, self.target_actor) self.soft_update(self.critic, self.target_critic)
接下来定义一个特殊的经验回放池,此时回放池内不再存储每一步的数据,而是存储一整条轨迹。这是 HER 算法中的核心部分,之后可以用 HER 算法从该经验回放池中构建新的数据来帮助策略训练。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 class Trajectory : ''' 用来记录一条完整轨迹 ''' def __init__ (self, init_state ): self.states = [init_state] self.actions = [] self.rewards = [] self.dones = [] self.length = 0 def store_step (self, action, state, reward, done ): self.actions.append(action) self.states.append(state) self.rewards.append(reward) self.dones.append(done) self.length += 1 class ReplayBuffer_Trajectory : ''' 存储轨迹的经验回放池 ''' def __init__ (self, capacity ): self.buffer = collections.deque(maxlen=capacity) def add_trajectory (self, trajectory ): self.buffer.append(trajectory) def size (self ): return len (self.buffer) def sample (self, batch_size, use_her, dis_threshold=0.15 , her_ratio=0.8 ): batch = dict ( states=[], actions=[], next_states=[], rewards=[], dones=[] ) for _ in range (batch_size): traj = random.sample(self.buffer, 1 )[0 ] step_state = np.random.randint(traj.length) state = traj.states[step_state] next_state = traj.states[step_state + 1 ] action = traj.actions[step_state] reward = traj.rewards[step_state] done = traj.dones[step_state] if use_her and np.random.uniform() <= her_ratio: step_goal = np.random.randint(step_state + 1 , traj.length + 1 ) goal = traj.states[step_goal][:2 ] dis = np.sqrt(np.sum (np.square(next_state[:2 ] - goal))) reward = -1.0 if dis > dis_threshold else 0 done = False if dis > dis_threshold else True state = np.hstack((state[:2 ], goal)) next_state = np.hstack((next_state[:2 ], goal)) batch['states' ].append(state) batch['next_states' ].append(next_state) batch['actions' ].append(action) batch['rewards' ].append(reward) batch['dones' ].append(done) batch['states' ] = np.array(batch['states' ]) batch['next_states' ] = np.array(batch['next_states' ]) batch['actions' ] = np.array(batch['actions' ]) return batch
最后,便可以开始在这个有目标的环境中运行采用了 HER 的 DDPG 算法,一起来看一下效果吧。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 actor_lr = 1e-3 critic_lr = 1e-3 hidden_dim = 128 state_dim = 4 action_dim = 2 action_bound = 1 sigma = 0.1 tau = 0.005 gamma = 0.98 num_episodes = 2000 n_train = 20 batch_size = 256 minimal_episodes = 200 buffer_size = 10000 device = torch.device("cuda" ) if torch.cuda.is_available() else torch.device("cpu" ) random.seed(0 ) np.random.seed(0 ) torch.manual_seed(0 ) env = WorldEnv() replay_buffer = ReplayBuffer_Trajectory(buffer_size) agent = DDPG(state_dim, hidden_dim, action_dim, action_bound, actor_lr, critic_lr, sigma, tau, gamma, device) return_list = []for i in range (10 ): with tqdm(total=int (num_episodes / 10 ), desc='Iteration %d' % i) as pbar: for i_episode in range (int (num_episodes / 10 )): episode_return = 0 state = env.reset() traj = Trajectory(state) done = False while not done: action = agent.take_action(state) state, reward, done = env.step(action) episode_return += reward traj.store_step(action, state, reward, done) replay_buffer.add_trajectory(traj) return_list.append(episode_return) if replay_buffer.size() >= minimal_episodes: for _ in range (n_train): transition_dict = replay_buffer.sample(batch_size, True ) agent.update(transition_dict) if (i_episode + 1 ) % 10 == 0 : pbar.set_postfix({ 'episode' : '%d' % (num_episodes / 10 * i + i_episode + 1 ), 'return' : '%.3f' % np.mean(return_list[-10 :]) }) pbar.update(1 ) episodes_list = list (range (len (return_list))) plt.plot(episodes_list, return_list) plt.xlabel('Episodes' ) plt.ylabel('Returns' ) plt.title('DDPG with HER on {}' .format ('GridWorld' )) plt.show()
1 2 3 4 5 6 7 8 9 10 Iteration 0: 100%|██████████| 200/200 [00:08<00:00, 24.96it/s, episode=200, return=-50.000] Iteration 1: 100%|██████████| 200/200 [01:41<00:00, 1.96it/s, episode=400, return=-4.400] Iteration 2: 100%|██████████| 200/200 [01:37<00:00, 2.06it/s, episode=600, return=-4.000] Iteration 3: 100%|██████████| 200/200 [01:36<00:00, 2.07it/s, episode=800, return=-4.100] Iteration 4: 100%|██████████| 200/200 [01:35<00:00, 2.09it/s, episode=1000, return=-4.500] Iteration 5: 100%|██████████| 200/200 [01:34<00:00, 2.11it/s, episode=1200, return=-4.500] Iteration 6: 100%|██████████| 200/200 [01:36<00:00, 2.08it/s, episode=1400, return=-4.600] Iteration 7: 100%|██████████| 200/200 [01:35<00:00, 2.09it/s, episode=1600, return=-4.100] Iteration 8: 100%|██████████| 200/200 [01:35<00:00, 2.09it/s, episode=1800, return=-4.300] Iteration 9: 100%|██████████| 200/200 [01:35<00:00, 2.09it/s, episode=2000, return=-3.600]
接下来尝试不采用 HER 重新构造数据,而是直接使用收集的数据训练一个策略,看看是什么效果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 random.seed(0 ) np.random.seed(0 ) torch.manual_seed(0 ) env = WorldEnv() replay_buffer = ReplayBuffer_Trajectory(buffer_size) agent = DDPG(state_dim, hidden_dim, action_dim, action_bound, actor_lr, critic_lr, sigma, tau, gamma, device) return_list = []for i in range (10 ): with tqdm(total=int (num_episodes / 10 ), desc='Iteration %d' % i) as pbar: for i_episode in range (int (num_episodes / 10 )): episode_return = 0 state = env.reset() traj = Trajectory(state) done = False while not done: action = agent.take_action(state) state, reward, done = env.step(action) episode_return += reward traj.store_step(action, state, reward, done) replay_buffer.add_trajectory(traj) return_list.append(episode_return) if replay_buffer.size() >= minimal_episodes: for _ in range (n_train): transition_dict = replay_buffer.sample(batch_size, False ) agent.update(transition_dict) if (i_episode + 1 ) % 10 == 0 : pbar.set_postfix({ 'episode' : '%d' % (num_episodes / 10 * i + i_episode + 1 ), 'return' : '%.3f' % np.mean(return_list[-10 :]) }) pbar.update(1 ) episodes_list = list (range (len (return_list))) plt.plot(episodes_list, return_list) plt.xlabel('Episodes' ) plt.ylabel('Returns' ) plt.title('DDPG without HER on {}' .format ('GridWorld' )) plt.show()
1 2 3 4 5 6 7 8 9 10 Iteration 0: 100%|██████████| 200/200 [00:03<00:00, 65.09it/s, episode=200, return=-50.000] Iteration 1: 100%|██████████| 200/200 [00:45<00:00, 4.42it/s, episode=400, return=-50.000] Iteration 2: 100%|██████████| 200/200 [00:46<00:00, 4.28it/s, episode=600, return=-50.000] Iteration 3: 100%|██████████| 200/200 [00:48<00:00, 4.14it/s, episode=800, return=-50.000] Iteration 4: 100%|██████████| 200/200 [00:47<00:00, 4.24it/s, episode=1000, return=-50.000] Iteration 5: 100%|██████████| 200/200 [00:46<00:00, 4.28it/s, episode=1200, return=-50.000] Iteration 6: 100%|██████████| 200/200 [00:46<00:00, 4.27it/s, episode=1400, return=-50.000] Iteration 7: 100%|██████████| 200/200 [00:48<00:00, 4.14it/s, episode=1600, return=-40.600] Iteration 8: 100%|██████████| 200/200 [00:47<00:00, 4.18it/s, episode=1800, return=-50.000] Iteration 9: 100%|██████████| 200/200 [00:50<00:00, 3.99it/s, episode=2000, return=-31.500]
通过实验对比,可以观察到使用 HER 算法后,效果有显著提升。这里 HER 算法的主要好处是通过重新对历史轨迹设置其目标(使用 future 方案)而使得奖励信号更加稠密,进而从原本失败的数据中学习到使“新任务”成功的经验,提升训练的稳定性和样本效率。
19.5 小结
本章介绍了目标导向的强化学习(GoRL)的基本定义,以及一个解决 GoRL 的有效的经典算法 HER。通过代码实践,HER 算法的效果得到了很好的呈现。我们从 HER 的代码实践中还可以领会一种思维方式,即可以通过整条轨迹的信息来改善每个转移片段带给智能体策略的学习价值。例如,在 HER 算法的 future 方案中,采样当前轨迹后续的状态作为目标,然后根据下一步状态是否离目标足够近来修改当前步的奖励信号。此外,HER 算法只是一个经验回放的修改方式,并没有对策略网络和价值网络的架构做出任何修改。而在后续的部分 GoRL 研究中,策略函数和动作价值函数会被显式建模成π ( a ∣ s , g ) \pi(a|s,g) π ( a ∣ s , g ) 和Q ( s , a , g ) Q(s,a,g) Q ( s , a , g ) ,即构建较为复杂的策略架构,使其直接知晓当前状态和目标,并使用更大的网络容量去完成目标。有兴趣的读者可以自行查阅相关的文献。
19.6 参考文献
[1] ANDRYCHOWICZ M, WOLSKI F, RAY A, et al. Hindsight Experience Replay [J]. Advances in neural information processing systems, 2017: 5055-5065.
[2] FLORENSA C, HELD D, GENG X Y, et al. Automatic goal generation for reinforcement learning agents [C]// International conference on machine learning, PMLR, 2018: 1515-1528.
[3] REN Z Z, DONG K, ZHOU Y, et al. Exploration via Hindsight Goal Generation [J]. Advances in neural information processing systems 2019, 32: 13485-13496.
[4] PITIS S, CHAN H, ZHAO S, et al. Maximum entropy gain exploration for long horizon multi-goal reinforcement learning [C]// International conference on machine learning, PMLR, 2020: 7750-7761.
20 多智能体强化学习入门
20.1 简介
本书之前介绍的算法都是单智能体强化学习算法,其基本假设是动态环境是稳态的 (stationary),即状态转移概率和奖励函数不变,并依此来设计相应的算法。而如果环境中还有其他智能体做交互和学习,那么任务则上升为多智能体强化学习 (multi-agent reinforcement learning,MARL),如图 20-1 所示。
图20-1 多智能体强化学习环境概览
多智能体的情形相比于单智能体更加复杂,因为每个智能体在和环境交互的同时也在和其他智能体进行直接或者间接的交互。因此,多智能体强化学习要比单智能体更困难,其难点主要体现在以下几点:
由于多个智能体在环境中进行实时动态交互,并且每个智能体在不断学习并更新自身策略,因此在每个智能体的视角下,环境是非稳态的 (non-stationary),即对于一个智能体而言,即使在相同的状态下采取相同的动作,得到的状态转移和奖励信号的分布可能在不断改变;
多个智能体的训练可能是多目标的,不同智能体需要最大化自己的利益;
训练评估的复杂度会增加,可能需要大规模分布式训练来提高效率。
20.2 问题建模
将一个多智能体环境用一个元组( N , S , A , R , P ) (N,\mathcal{S,A,R}, P) ( N , S , A , R , P ) 表示,其中N N N 是智能体的数目,S = S 1 , × ⋯ × S N \mathcal{S}=S_1,\times\cdots\times S_N S = S 1 , × ⋯ × S N 是所有智能体的状态集合,A = A 1 × ⋯ × A N \mathcal{A}=A_1\times\cdots\times A_N A = A 1 × ⋯ × A N 是所有智能体的动作集合,R = r 1 × ⋯ × r N \mathcal{R}=r_1\times\cdots\times r_N R = r 1 × ⋯ × r N 是所有智能体奖励函数的集合,P P P 是环境的状态转移概率。一般多智能体强化学习的目标是为每个智能体学习一个策略来最大化其自身的累积奖励。
20.3 多智能体强化学习的基本求解范式
面对上述问题形式,最直接的想法是基于已经熟悉的单智能体算法来进行学习,这主要分为两种思路。
完全中心化 (fully centralized)方法:将多个智能体进行决策当作一个超级智能体在进行决策,即把所有智能体的状态聚合在一起当作一个全局的超级状态,把所有智能体的动作连起来作为一个联合动作。这样做的好处是,由于已经知道了所有智能体的状态和动作,因此对这个超级智能体来说,环境依旧是稳态的,一些单智能体的算法的收敛性依旧可以得到保证。然而,这样的做法不能很好地扩展到智能体数量很多或者环境很大的情况,因为这时候将所有的信息简单暴力地拼在一起会导致维度爆炸,训练复杂度巨幅提升的问题往往不可解决。
完全去中心化 (fully decentralized)方法:与完全中心化方法相反的范式便是假设每个智能体都在自身的环境中独立地进行学习,不考虑其他智能体的改变。完全去中心化方法直接对每个智能体用一个单智能体强化学习算法来学习。这样做的缺点是环境是非稳态的,训练的收敛性不能得到保证,但是这种方法的好处在于随着智能体数量的增加有比较好的扩展性,不会遇到维度灾难而导致训练不能进行下去。
本章介绍完全去中心化方法,在原理解读和代码实践之后,进一步通过实验结果图看看这种方法的效果。第 21 章会进一步介绍进阶的多智能体强化学习的求解范式。
20.4 IPPO 算法
接下来将介绍一个完全去中心化的算法,此类算法被称为独立学习 (independent learning)。由于对于每个智能体使用单智能体算法 PPO 进行训练,所因此这个算法叫作独立PPO (Independent PPO,IPPO)算法。具体而言,这里使用的 PPO 算法版本为 PPO-截断,其算法流程如下:
对于N N N 个智能体,为每个智能体初始化各自的策略以及价值函数
for 训练轮数 k = 0 , 1 , 2 , ⋯ k=0, 1,2,\cdots k = 0 , 1 , 2 , ⋯ do
所有智能体在环境中交互分别获得各自的一条轨迹数据
对每个智能体,基于当前的价值函数用 GAE 计算优势函数的估计
对每个智能体,通过最大化其 PPO-截断的目标来更新其策略
对每个智能体,通过均方误差损失函数优化其价值函数
end for
20.5 IPPO 代码实践
下面介绍一下要使用的多智能体环境:ma_gym
库中的 Combat 环境。Combat 是一个在二维的格子世界上进行的两个队伍的对战模拟游戏,每个智能体的动作集合为:向四周移动1 1 1 格,攻击周围3 × 3 3\times 3 3 × 3 格范围内其他敌对智能体,或者不采取任何行动。起初每个智能体有 3 3 3 点生命值,如果智能体在敌人的攻击范围内被攻击到了,则会扣 1 1 1 生命值,生命值掉为 0 0 0 则死亡,最后存活的队伍获胜。每个智能体的攻击有一轮的冷却时间。
在游戏中,我们能够控制一个队伍的所有智能体与另一个队伍的智能体对战。另一个队伍的智能体使用固定的算法:攻击在范围内最近的敌人,如果攻击范围内没有敌人,则向敌人靠近。图 20-2 是一个简单的 Combat 环境示例。
图20-2 Combat 环境示例
首先仍然导入一些需要用到的包,然后从 GitHub 中克隆ma-gym
仓库到本地,并且导入其中的 Combat 环境。
1 2 3 4 5 6 7 8 9 10 11 import torchimport torch.nn.functional as Fimport numpy as npimport rl_utilsfrom tqdm import tqdmimport matplotlib.pyplot as plt ! git clone https://github.com/boyu-ai/ma-gym.gitimport sys sys.path.append("./ma-gym" )from ma_gym.envs.combat.combat import Combat
接下来的代码块与 12.4 节介绍过的 PPO 代码实践基本一致,不再赘述。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 class PolicyNet (torch.nn.Module): def __init__ (self, state_dim, hidden_dim, action_dim ): super (PolicyNet, self).__init__() self.fc1 = torch.nn.Linear(state_dim, hidden_dim) self.fc2 = torch.nn.Linear(hidden_dim, hidden_dim) self.fc3 = torch.nn.Linear(hidden_dim, action_dim) def forward (self, x ): x = F.relu(self.fc2(F.relu(self.fc1(x)))) return F.softmax(self.fc3(x), dim=1 )class ValueNet (torch.nn.Module): def __init__ (self, state_dim, hidden_dim ): super (ValueNet, self).__init__() self.fc1 = torch.nn.Linear(state_dim, hidden_dim) self.fc2 = torch.nn.Linear(hidden_dim, hidden_dim) self.fc3 = torch.nn.Linear(hidden_dim, 1 ) def forward (self, x ): x = F.relu(self.fc2(F.relu(self.fc1(x)))) return self.fc3(x)class PPO : ''' PPO算法,采用截断方式 ''' def __init__ (self, state_dim, hidden_dim, action_dim, actor_lr, critic_lr, lmbda, eps, gamma, device ): self.actor = PolicyNet(state_dim, hidden_dim, action_dim).to(device) self.critic = ValueNet(state_dim, hidden_dim).to(device) self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr) self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=critic_lr) self.gamma = gamma self.lmbda = lmbda self.eps = eps self.device = device def take_action (self, state ): state = torch.tensor([state], dtype=torch.float ).to(self.device) probs = self.actor(state) action_dist = torch.distributions.Categorical(probs) action = action_dist.sample() return action.item() def update (self, transition_dict ): states = torch.tensor(transition_dict['states' ], dtype=torch.float ).to(self.device) actions = torch.tensor(transition_dict['actions' ]).view(-1 , 1 ).to( self.device) rewards = torch.tensor(transition_dict['rewards' ], dtype=torch.float ).view(-1 , 1 ).to(self.device) next_states = torch.tensor(transition_dict['next_states' ], dtype=torch.float ).to(self.device) dones = torch.tensor(transition_dict['dones' ], dtype=torch.float ).view(-1 , 1 ).to(self.device) td_target = rewards + self.gamma * self.critic(next_states) * (1 - dones) td_delta = td_target - self.critic(states) advantage = rl_utils.compute_advantage(self.gamma, self.lmbda, td_delta.cpu()).to(self.device) old_log_probs = torch.log(self.actor(states).gather(1 , actions)).detach() log_probs = torch.log(self.actor(states).gather(1 , actions)) ratio = torch.exp(log_probs - old_log_probs) surr1 = ratio * advantage surr2 = torch.clamp(ratio, 1 - self.eps, 1 + self.eps) * advantage actor_loss = torch.mean(-torch.min (surr1, surr2)) critic_loss = torch.mean( F.mse_loss(self.critic(states), td_target.detach())) self.actor_optimizer.zero_grad() self.critic_optimizer.zero_grad() actor_loss.backward() critic_loss.backward() self.actor_optimizer.step() self.critic_optimizer.step()
现在进入 IPPO 代码实践的最主要部分。值得注意的是,在训练时使用了参数共享 (parameter sharing)的技巧,即对于所有智能体使用同一套策略参数,这样做的好处是能够使得模型训练数据更多,同时训练更稳定。能够这样做的前提是,两个智能体是同质的 (homogeneous),即它们的状态空间和动作空间是完全一致的,并且它们的优化目标也完全一致。感兴趣的读者也可以自行实现非参数共享版本的 IPPO,此时每个智能体就是一个独立的 PPO 的实例。
和之前的一些实验不同,这里不再展示智能体获得的回报,而是将 IPPO 训练的两个智能体团队的胜率作为主要的实验结果。接下来就可以开始训练 IPPO 了!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 actor_lr = 3e-4 critic_lr = 1e-3 num_episodes = 100000 hidden_dim = 64 gamma = 0.99 lmbda = 0.97 eps = 0.2 device = torch.device("cuda" ) if torch.cuda.is_available() else torch.device( "cpu" ) team_size = 2 grid_size = (15 , 15 ) env = Combat(grid_shape=grid_size, n_agents=team_size, n_opponents=team_size) state_dim = env.observation_space[0 ].shape[0 ] action_dim = env.action_space[0 ].n agent = PPO(state_dim, hidden_dim, action_dim, actor_lr, critic_lr, lmbda, eps, gamma, device) win_list = []for i in range (10 ): with tqdm(total=int (num_episodes / 10 ), desc='Iteration %d' % i) as pbar: for i_episode in range (int (num_episodes / 10 )): transition_dict_1 = { 'states' : [], 'actions' : [], 'next_states' : [], 'rewards' : [], 'dones' : [] } transition_dict_2 = { 'states' : [], 'actions' : [], 'next_states' : [], 'rewards' : [], 'dones' : [] } s = env.reset() terminal = False while not terminal: a_1 = agent.take_action(s[0 ]) a_2 = agent.take_action(s[1 ]) next_s, r, done, info = env.step([a_1, a_2]) transition_dict_1['states' ].append(s[0 ]) transition_dict_1['actions' ].append(a_1) transition_dict_1['next_states' ].append(next_s[0 ]) transition_dict_1['rewards' ].append( r[0 ] + 100 if info['win' ] else r[0 ] - 0.1 ) transition_dict_1['dones' ].append(False ) transition_dict_2['states' ].append(s[1 ]) transition_dict_2['actions' ].append(a_2) transition_dict_2['next_states' ].append(next_s[1 ]) transition_dict_2['rewards' ].append( r[1 ] + 100 if info['win' ] else r[1 ] - 0.1 ) transition_dict_2['dones' ].append(False ) s = next_s terminal = all (done) win_list.append(1 if info["win" ] else 0 ) agent.update(transition_dict_1) agent.update(transition_dict_2) if (i_episode + 1 ) % 100 == 0 : pbar.set_postfix({ 'episode' : '%d' % (num_episodes / 10 * i + i_episode + 1 ), 'return' : '%.3f' % np.mean(win_list[-100 :]) }) pbar.update(1 )
1 2 3 4 5 6 7 8 9 10 Iteration 0: 100%|██████████| 10000/10000 [07:17<00:00, 22.85it/s, episode=10000, return=0.310] Iteration 1: 100%|██████████| 10000/10000 [05:43<00:00, 29.08it/s, episode=20000, return=0.370] Iteration 2: 100%|██████████| 10000/10000 [05:30<00:00, 30.26it/s, episode=30000, return=0.560] Iteration 3: 100%|██████████| 10000/10000 [04:54<00:00, 33.96it/s, episode=40000, return=0.670] Iteration 4: 100%|██████████| 10000/10000 [04:20<00:00, 38.46it/s, episode=50000, return=0.670] Iteration 5: 100%|██████████| 10000/10000 [03:52<00:00, 43.09it/s, episode=60000, return=0.620] Iteration 6: 100%|██████████| 10000/10000 [03:55<00:00, 42.53it/s, episode=70000, return=0.610] Iteration 7: 100%|██████████| 10000/10000 [03:40<00:00, 45.26it/s, episode=80000, return=0.640] Iteration 8: 100%|██████████| 10000/10000 [03:48<00:00, 43.81it/s, episode=90000, return=0.650] Iteration 9: 100%|██████████| 10000/10000 [03:42<00:00, 44.91it/s, episode=100000, return=0.770]
1 2 3 4 5 6 7 8 9 10 win_array = np.array(win_list) win_array = np.mean(win_array.reshape(-1 , 100 ), axis=1 ) episodes_list = np.arange(win_array.shape[0 ]) * 100 plt.plot(episodes_list, win_array) plt.xlabel('Episodes' ) plt.ylabel('Win rate' ) plt.title('IPPO on Combat' ) plt.show()
可以看出,当智能体数量较少的时候,IPPO 这种完全去中心化学习在一定程度上能够取得好的效果,但是最终达到的胜率也比较有限。这可能是因为多个智能体之间无法有效地通过合作来共同完成目标。同时,好奇的读者也可以尝试增加智能体的数量,比较一下训练结果。当数量增加到 5 时,这种完全去中心化学习的训练效果就不是很好了。这时候可能就需要引入更多的算法来考虑多个智能体之间的交互行为,或者使用中心化训练去中心化执行 (centralized training with decentralized execution,CTDE)的范式来进行多智能体训练,该方法将在第 21 章中详细介绍。
20.6 小结
本章介绍了多智能体强化学习的概念和两类基本的解决范式,并针对其中的完全去中心化方法进行了详细的介绍,讲解了一个具体的算法 IPPO,即用 PPO 算法为各个智能体训练各自的策略。在 Combat 环境中,我们共享了两个智能体之间的策略,以达到更好的效果。但这仅限于多个智能体同质的情况,若它们的状态空间或动作空间不一致,那便无法进行策略共享。
20.7 参考文献
[1] HERNANDEZ L P, BILAL K, TAYLOR M E. A survey and critique of multiagent deep reinforcement learning[J]. Autonomous Agents and Multi-Agent Systems, 2019, 33(6): 750-797.
[2] TAMPUU A, MATIISEN T, KODELJA D, et al. Multiagent cooperation and competition with deep reinforcement learning [J]. PloS One, 2017; 12(4): e0172395.
[3] TAN M. Multi-agent reinforcement learning: independent vs. cooperative agents [C]// International conference on machine learning, 1993: 330-337.
[4] Combat 环境(参见 GitHub 网站中的 koulanurag/ma-gym 项目).
21 多智能体强化学习进阶
21.1 简介
第 20 章中已经初步介绍了多智能体强化学习研究的问题和最基本的求解范式。本章来介绍一种比较经典且效果不错的进阶范式:中心化训练去中心化执行 (centralized training with decentralized execution,CTDE)。所谓中心化训练去中心化执行是指在训练的时候使用一些单个智能体看不到的全局信息而以达到更好的训练效果,而在执行时不使用这些信息,每个智能体完全根据自己的策略直接动作以达到去中心化执行的效果。中心化训练去中心化执行的算法能够在训练时有效地利用全局信息以达到更好且更稳定的训练效果,同时在进行策略模型推断时可以仅利用局部信息,使得算法具有一定的扩展性。CTDE 可以类比成一个足球队的训练和比赛过程:在训练时,11 个球员可以直接获得教练的指导从而完成球队的整体配合,而教练本身掌握着比赛全局信息,教练的指导也是从整支队、整场比赛的角度进行的;而训练好的 11 个球员在上场比赛时,则根据场上的实时情况直接做出决策,不再有教练的指导。
CTDE 算法主要分为两种:一种是基于值函数的方法,例如 VDN,QMIX 算法等;另一种是基于 Actor-Critic 的方法,例如 MADDPG 和 COMA 等。本章将重点介绍 MADDPG 算法。
21.2 MADDPG 算法
多智能体 DDPG(muli-agent DDPG,MADDPG)算法从字面意思上来看就是对于每个智能体实现一个 DDPG 的算法。所有智能体共享一个中心化的 Critic 网络,该 Critic 网络在训练的过程中同时对每个智能体的 Actor 网络给出指导,而执行时每个智能体的 Actor 网络则是完全独立做出行动,即去中心化地执行。
CTDE 算法的应用场景通常可以被建模为一个部分可观测马尔可夫博弈 (partially observable Markov games):用S \mathcal{S} S 代表N N N 个智能体所有可能的状态空间,这是全局的信息。对于每个智能体i i i ,其动作空间为A i \mathcal{A}_i A i ,观测空间为O i \mathcal{O}_i O i ,每个智能体的策略π θ i : O i × A i → [ 0 , 1 ] \pi_{\theta_i}: \mathcal{O}_i\times \mathcal{A}_i\rightarrow [0,1] π θ i : O i × A i → [ 0 , 1 ] 是一个概率分布,用来表示智能体在每个观测下采取各个动作的概率。环境的状态转移函数为T : S × A 1 × ⋯ × A N → Ω ( S ) \mathcal{T:S\times A_1 \times \cdots \times A_N \rightarrow \Omega(S)} T : S × A 1 × ⋯ × A N → Ω ( S ) 。每个智能体的奖励函数为r i : S × A → R r_i: \mathcal{S\times A}\rightarrow \mathbb{R} r i : S × A → R ,每个智能体从全局状态得到的部分观测信息为o i : S → O i \mathbf{o}_i: \mathcal{S\rightarrow O_i} o i : S → O i ,初始状态分布为ρ : S → [ 0 , 1 ] \rho:\mathcal{S}\rightarrow[0,1] ρ : S → [ 0 , 1 ] 。每个智能体的目标是最大化其期望累积奖励 E [ ∑ t = 0 T γ t r i t ] \displaystyle\mathbb{E}[\sum_{t=0}^T \gamma^t r_i^t] E [ t = 0 ∑ T γ t r i t ] 。
接下来我们看一下 MADDPG 算法的主要细节吧!如图 21-1 所示,每个智能体用 Actor-Critic 的方法训练,但不同于传统单智能体的情况,在 MADDPG 中每个智能体的 Critic 部分都能够获得其他智能体的策略信息。具体来说,考虑一个有N N N 个智能体的博弈,每个智能体的策略参数为θ = { θ 1 , ⋯ , θ N } \theta = \{\theta_1,\cdots,\theta_N\} θ = { θ 1 , ⋯ , θ N } ,记π = { π 1 , ⋯ , π N } \pi = \{\pi_1,\cdots,\pi_N\} π = { π 1 , ⋯ , π N } 为所有智能体的策略集合,那么我们可以写出在随机性策略情况下每个智能体的期望收益的策略梯度:
∇ θ i J ( θ i ) = E s ∼ p μ , a ∼ π i [ ∇ θ i log π i ( a i ∣ o i ) Q i π ( x , a 1 , ⋯ , a N ) ] \nabla_{\theta_i} J(\theta_i) = \mathbb{E}_{s\sim p^\mu, a\sim \pi_i} \Big[
\nabla_{\theta_i} \log\pi_i(a_i|o_i) Q_i^\pi (\mathbf{x}, a_1,\cdots,a_N)
\Big]
∇ θ i J ( θ i ) = E s ∼ p μ , a ∼ π i [ ∇ θ i log π i ( a i ∣ o i ) Q i π ( x , a 1 , ⋯ , a N ) ]
其中,Q i π ( x , a 1 , ⋯ , a N ) Q_i^\pi (\mathbf{x}, a_1,\cdots,a_N) Q i π ( x , a 1 , ⋯ , a N ) 就是一个中心化的动作价值函数。为什么说Q i Q_i Q i 是一个中心化的动作价值函数呢?一般来说x = ( o 1 , ⋯ , o N ) \mathbf{x}=(o_1,\cdots,o_N) x = ( o 1 , ⋯ , o N ) 包含了所有智能体的观测,另外Q i Q_i Q i 也需要输入所有智能体在此刻的动作,因此Q i Q_i Q i 工作的前提就是所有智能体要同时给出