循环确定性策略梯度(Recurrent Deterministic Policy Gradient,R DPG)算法在论文Memory-based control with recurrent neural networks中提出,属于策略梯度算法。和前面所介绍的基于值函数的DRL训练方法不同,策略梯度算法通过计算梯度来更新策略网络中的参数,使得整个策略网络朝着奖励增高的方向更新。
本章的2.1节讲到了DQN,可以说它是神经网络在RL中取得的重大突破,也为RL的发展提供了方向和基础,Sliver等人将其应用在Atari游戏中并达到了接近人类玩家的水平。后来大批量的论文均采用了DQN的思想,同时基于此提出了更多算法。但是该类算法有以下缺点:Atari游戏所需的动作是离散的,属于低维(只有少数几个动作),但现实生活中很多问题都是连续的,而且维度比较高,比如机器人控制(多个自由度)、汽车方向盘转向角度、油门大小、天气预报推荐指数等。虽然可以对连续性高维度的动作做离散型的处理,但是对于一个经过离散处理的大状态空间,使用DQN训练是一个比较棘手的问题,因为DQN算法的核心思想是利用随机策略进行探索,对于高维度来说,第一个问题是模型很难收敛,第二个问题是需要在探索和利用之间进行协调。
在考虑到基于值函数的更新方法的这些局限性后,DRL的研究者们转向了另一种策略更新方法,即策略梯度法。基于此方向,发展出了一些优秀的更新方法,如确定性策略梯度(Deterministic Policy Gradient,DPG)。
无模型的策略搜索方法可以分为随机策略搜索方法和确定性策略搜索方法。在2014年以前,学者们大都在发展随机策略搜索的方法,因为大家认为确定性策略梯度是不存在的。直到2014年,David Silver在论文《Deterministic Policy Gradient Algorithms》中提出了确定性策略理论,策略搜索方法才出现了确定性策略这种方法。
以前的随机搜索的公式为π θ (a|s)=P[a|s;θ],其含义是在给定状态s和策略网络参数θ时,输出的动作a服从一个概率分布,也就是说每次走进这个状态时,输出的动作可能不同。而确定性策略的公式是a=μ θ (s),其中的μ是一个确定性映射,给定状态和参数,输出的动作是确定的。
以上这些算法在基于MDP的环境中(也就是说环境的状态对于智能体来说是完全可观察的)得到了很好的运用。然而许多现实世界的控制问题只是部分可观察的。部分可观察性的原因很多,包括有些需要智能体记住的信息只是临时可得到的,例如导航任务中的路标、传感器限制或噪声,也包括由于功能近似导致的状态混叠等。部分可观察性自然也出现在涉及视觉控制的许多任务中,如动态场景的静态图像不提供关于速度的信息,由于世界的三维性质而发生遮挡,并且大多数视觉传感器受带宽限制,只有一个受限制的视野。
和MDP一样,POMDP是一个行动决策过程,由于无法像MDP一样得到当前真实完全的状态信息,它只能利用先前时间步骤的观察和行动知识来增强当前观察。在控制问题中使用DRL,显然需要一种方法来表示先前的观察结果;这些将有助于推断真实的当前状态。
由于真实的状态只能通过终端状态的完全置信传播,所以可以通过值估计来进行策略优化。一些循环神经网络(Recurrent Neural Network,RNN)的优秀变种的出现,如长短时记忆神经网络(Long Short Term Memory network,LSTM),使得智能体综合之前的历史信息特征成为可能。RDPG正是利用了RNN的这一特性,使之与确定性策略梯度相结合,得到了RDPG,解决了POMDP中的部分可观察问题。
RDPG是一类使用RNN来构造估计策略的确定性策略梯度算法。这种算法在POMDP中尤为适用,这是因为每个智能体只能观察到环境的一部分,环境的真实状态是无法得到的。所以为了逼近真实的状态,我们用从开始到现在的整个观察片段来逼近当前的整个环境状态。而RNN的使用,使得我们不需要每次从头开始处理所有的观察片段,之前一段时间内的所有观察片段的特征已经存储在了RNN中的隐含层状态中,而每次只需要增加当前时刻观察片段的处理即可。
作为actor-critic算法家族中的一员,RDPG中也使用了策略网络和价值网络。价值网络用于估计状态-动作的价值,然后把评分信息送给策略网络,用作动作函数在Q网络上的几何梯度(
)。
在更新中,首先通过扫描得到代表在过去一段时间内的状态向量h -1
f为策略网络或者价值网络中记录过去一段时间内的历史状态的RNN。
之后,计算从经验复用池中选取的小批量的TD梯度,也就是预测价值网络的误差梯度
:
为通过目标价值网络计算出的预期目标值。目标价值网络的输入是下一时刻的历史状态h
t+1,i
和下一时刻状态下的动作a
t+1,i
,该动作由目标策略网络μ′产生,其输入是h
t+1,i
,
的计算过程可描述为下式:
接着,使用Adam优化器通过最小化上述价值网络的误差来更新价值网络。然后,我们使用Q网络的梯度来计算策略网络的梯度:
依旧使用Adam网络优化器梯度优化策略网络。
在循环确定性神经网络中,还使用了经验复用来优化训练过程。经验复用在前文中已经有所介绍。在该网络中,经验复用步骤如下。首先,智能体选择要进行采样的多个episode;其次,在每个选择的episode中,选取一段长度为l的经验片段,并将这些片段堆积成为小批量;最后,基于RNN的actor-critic网络读取这些片段,衡量它们的目标,并且生成用于策略网络更新的参数的梯度。RDPG的网络训练流程图如图2.7所示。
图2.7 RDPG网络结构图
算法:RDPG
下面给出基于PyTorch实现的RDPG算法的部分代码解析。
策略网络结构解析
1. def __init__(self, state_dim, action_dim, action_lim): 2. super(Actor, self).__init__() 3. self.state_dim = state_dim 4. self.action_dim = action_dim 5. self.action_lim = action_lim 6. self.cnn = CNN() 7. self.fc1 = nn.Linear((64*9*9)*2, 800) 8. self.fc1.weight.data = fanin_init(self.fc1.weight.data.size()) 9. self.fc2 = nn.Linear(800, 800) 10. self.fc2.weight.data = fanin_init(self.fc2.weight.data.size()) 11. self.lstm = nn.LSTMCell(800, 800) 12. self.fc3 = nn.Linear(800, action_dim) 13. self.fc3.weight.data.uniform_(-EPS, EPS) 14. self.cx = torch.FloatTensor(1, 800).zero_().cuda() 15. self.hx = torch.FloatTensor(1, 800).zero_().cuda()
首先初始化策略网络的组成要素。需要初始化的有动作维数action_dim、状态维数state_dim,根据具体的环境,需要定义动作值的限制action_lim。
该段代码创建带有RNN的策略网络。RNN网络用于存储历史观测,产生估计真实状态h,后面的全连接网络用于根据真实状态产生动作。
首先,下面的网络由一层卷积神经网络(Convolutional Neural Network,CNN)来提取当前环境中的状态。CNN后面接两层全连接网络。全连接网络后接一个LSTM,用来存储之前的观察。LSTM的输入维度为800,输出为100,nstates为100,clock_periods=[1,2,4,8,16],后接一个全连接层,最终的输出为动作。
1. def forward(self, state, hidden_states = None): 2. state = state.float() 3. for i in range(state.shape[0]): 4. state1 = self.cnn.forward(state[i]) 5. if i == 0: 6. STATE = state1 7. if i ! = 0: 8. STATE = torch.cat((STATE, state1), dim = 1) 9. x = STATE 10. x = F.relu(self.fc1(x)) 11. x = F.relu(self.fc2(x)) 12. if hidden_states == None: 13. hx, cx = self.lstm(x, (self.hx, self.cx)) 14. self.hx = hx 15. self.cx = cx 16. else: 17. hx, cx = self.lstm(x, hidden_states) 18. x = hx 19. action = torch.tanh(self.fc3(x)) 20. action = action * self.action_lim 21. return action, (hx, cx)
策略网络拟合的是p(a|h),所以输出是给定观察下动作的值。上面代码是其前向传导网络定义。从前面的定义可知,状态在策略网络中,首先经过两个全连接层,之后经过一个LSTM网络,状态转变成了一个特征向量,这个特征向量通过一个全连接层后,经过tanh处理,形成最终的动作的输出。网络的输入形状为[n,state_dim],输出形状为[n,action_dim]。
首先,将从经验池中随机采样得到的状态通过CNN得到其特征,然后接下来这段代码的作用是对之前处理的观察结果和之前的历史状态结合,形成估计的真实状态。接着,将处理后的状态放入第三层全连接网络中,输出动作,并且通过tanh函数,以及之前输入的动作值限制,输出最终预估的动作。
价值网络结构解析
1. def __init__(self, state_dim, action_dim): 2. super(Critic, self).__init__() 3. self.state_dim = state_dim 4. self.action_dim = action_dim 5. self.cnn = CNN() 6. self.fc1 = nn.Linear((64*9*9+20)*2, 800) 7. self.fc1.weight.data = fanin_init(self.fc1.weight.data.size()) 8. self.fc2 = nn.Linear(800, 800) 9. self.fc2.weight.data = fanin_init(self.fc2.weight.data.size()) 10. self.fc3 = nn.Linear(800, 1) 11. self.fc3.weight.data.uniform_(-EPS, EPS)
首先,初始化价值网络的组成要素。价值网络需要初始化的参数有状态维数state_dim、动作维数action_dim。
其次,和之前的策略网络结构相似,定义价值网络的网络组成结构。由下面代码可知,我们的价值网络由一个卷积层CNN和三个全连接层self.fc1、self.fc2、self.fc3组成。
1. def forward(self, state, action): 2. state = state.float() 3. for i in range(state.shape[0]): 4. state1 = self.cnn.forward(state[i]) 5. op_batch = [] 6. for j in range(action.shape[0]): 7. if(action[j][i*3]> = 0): 8. op = np.ones(20) 9. else: 10. op = -1*np.ones(20) 11. op_batch.append(op) 12. op_batch_tensor = torch.from_numpy(np.array(op_batch)).float().cuda() 13. state1 = torch.cat((op_batch_tensor, state1), dim = 1) 14. if i == 0: 15. STATE = state1 16. if i! = 0: 17. STATE = torch.cat((STATE, state1), dim = 1) 18. x = STATE 19. x = F.relu(self.fc1(x)) 20. x = F.relu(self.fc2(x)) 21. x = self.fc3(x) 22. return x
上面这段代码定义了价值网络的前向传导。前向传导网络需要两个输入,一个是状态state[n,state_dim],一个是动作action[n,action_dim]。具体来说,critic网络使用torch.cat函数,将状态张量和动作张量连接到一起,共同作为网络的输入。在中间处理部分,前面已经说过,使用了三个全连接层进行学习,学习每个状态-动作对的输出价值估计,Q(s,a),形状为[n,1]。
两层循环展示了动作和状态具体是怎样连接在一起的。根据环境的不同,可以有不同的拼接方式。在此环境中,最外层循环为状态,对每个状态,首先经过CNN提取特征,之后是内层的循环,对于每一个动作进行判断处理,将处理后的动作和每层的状态连接到一起。最后形成的一个由状态和动作组成的张量放入网络中,得到输出。
训练过程解析
1. def __init__(self, state_dim, action_dim, action_lim, ram): 2. self.state_dim = state_dim 3. self.action_dim = action_dim 4. self.action_lim = action_lim 5. self.ram = ram 6. self.iter = 0 7. self.T = 0 8. self.noise = utils.OrnsteinUhlenbeckActionNoise(self.action_dim) 9. 10. self.actor = model.Actor(self.state_dim, self.action_dim, self.action_lim).cuda() 11. self.target_actor = model.Actor(self.state_dim, self.action_dim, self.action_lim).cuda() 12. self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), LEARNING_RATE) 13. 14. self.critic = model.Critic(self.state_dim, self.action_dim).cuda() 15. self.target_critic = model.Critic(self.state_dim, self.action_dim).cuda() 16. self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), LEARNING_RATE) 17. 18. utils.hard_update(self.target_actor, self.actor) 19. utils.hard_update(self.target_critic, self.critic)
首先初始化训练模块:需要传入的参数有状态空间维数state_dim、动作空间维数action_dim、动作值范围action_lim、经验复用池ram。需要初始化的参数有动作噪声self.noise、在线策略网络self.actor、目标策略网络self.target_actor、在线策略网络的优化器self.actor_optimizer、价值网络self.critic、目标价值网络self.target_critic、在线价值网络优化器self.critic_optimizer。由代码可知,在线网络与目标网络之间的复制过程为硬更新,即将在线网络参数直接复制给目标网络。为了稳定目标网络的更新,也可以使用本书中介绍的在线网络与目标网络之间的软更新。
1. def get_exploration_action(self, state): 2. with torch.no_grad(): 3. state = state.transpose((1, 0, 2, 3, 4)) 4. state = torch.from_numpy(state).cuda() 5. action, _ = self.actor.forward(state) 6. new_action = action.data.cpu().numpy() + (self.noise.sample() * self.action_lim) 7. return new_action
其次,定义动作产生函数,该函数的输入是给定状态,输出是添加了噪声之后的动作值。该函数中使用了之前定义的策略网络。self.actor的输入是状态state,其输出是根据当前状态以及在线策略网络的策略而得到的当前动作action,new_action为最终得到的动作,可以看出,它是加上了动作噪声之后产生的结果。
1. def optimize(self): 2. S, A, R, NS = self.ram.sample(64) 3. S = np.float32(S) 4. A = np.float32(A) 5. R = np.float32(R) 6. NS = np.float32(NS) 7. S = S.transpose(1, 0, 2, 3, 4, 5) 8. A = A.transpose((1, 0, 2)) 9. R = R.transpose((1, 0)) 10. NS = NS.transpose((1, 0, 2, 3, 4, 5)) 11. self.T = S.shape[0] 12. self.BATCH_SIZE = S.shape[1] 13. target_cx = torch.FloatTensor(self.BATCH_SIZE, 800).zero_().cuda() 14. target_hx = torch.FloatTensor(self.BATCH_SIZE, 800).zero_().cuda() 15. cx = torch.FloatTensor(self.BATCH_SIZE, 800).zero_().cuda() 16. hx = torch.FloatTensor(self.BATCH_SIZE, 800).zero_().cuda()
上面这段代码定义了网络的训练与优化模块:首先从经验复用池中取出n个片段,然后对数据进行规范化处理。状态向量S、动作向量A、回报向量R、转移后的状态向量NS,都是先从经验回访池self.ram中随机取得的。使用np.float函数将S、A、R、NS转换成浮点数数据格式。Numpy中transpose函数为张量提供了指定维度的交换,在数据处理中经常会用到,在本环境中,我们就把S、A、R、NS的第零维和第一维做了交换。self.BATCH_SIZE定义了用于训练的批量的大小,用状态向量S的第一维来定义。
1. for i in range(self.T): 2. s1 = torch.from_numpy(S[i].transpose((1, 0, 2, 3, 4))).cuda() 3. a1 = torch.from_numpy(A[i]).cuda() 4. r1 = torch.from_numpy(R[i]).cuda() 5. s2 = torch.from_numpy(NS[i].transpose((1, 0, 2, 3, 4))).cuda() 6. target_cx = target_cx.detach() 7. target_hx = target_hx.detach() 8. cx = cx.detach() 9. hx = hx.detach()
上面一段代码开始更新,首先,还是数据的规范化处理:当前状态s1和转移的状态s2均进行了维度的转换。Detach将返回一个新的从当前图中分离的变量。返回的变量永远不需要梯度。我们对target_cx、target_hx、cx和hx张量进行了detach操作。这些张量都要被运用于RNN。
1. with torch.no_grad(): 2. a2, (target_hx, target_cx) = self.target_actor.forward(s2, (target_hx, target_cx)) 3. next_val = torch.squeeze(self.target_critic.forward(s2, a2))
首先,将下一状态s2放入目标策略网络中,计算得到下一状态的目标动作。之后,使用产生的目标动作放入到目标价值网络self.target_critic中,计算出下一状态的目标价值next_val。
1. r1 = r1.reshape(-1) 2. y_expected = r1 + GAMMA * next_val
使用上面产生的目标价值next_val,计算折扣累积回报目标值y expected =r+γQ′(s 2 ,μ′(s 2 ))
1. y_predicted = torch.squeeze(self.critic.forward(s1, a1))
将当前状态s1和当前动作a1放入价值网络中,产生对于当前动作-状态的预测值y predicted =Q(s 1 ,a 1 )。
1. self.loss_critic = F.smooth_l1_loss(y_predicted, y_expected, reduction = 'elementwise_mean')
根据y_predicted和y_expected的误差,计算价值网络的损失loss,并且更新价值网络。
1. pred_a1, (hx, cx) = self.actor.forward(s1, (hx, cx))
上面这段代码用于更新策略网络。其输入是当前观察s1和之前的LSTM状态(hx,cx)。将其放入策略网络self.actor中,计算得到当前真实状态,并进行预测,得到当前的预测动作pred_a1。
1. self.loss_actor = -1 * torch.sum(self.critic.forward(s1, pred_a1)) 2. self.loss_actor = self.loss_actor.mean()
将当前状态s1和预测动作pred_a1放入价值网络中,得到预测值。
1. self.loss_critic.requires_grad_() 2. self.critic_optimizer.zero_grad() 3. self.loss_critic.backward() 4. self.critic_optimizer.step()
计算critic网络的损失函数self.loss_critic的梯度,并使用该梯度更新critic网络。
1. self.loss_actor.requires_grad() 2. self.actor_optimizer.zero_grad() 3. self.loss_actor.backward() 4. self.actor_optimizer.step()
计算策略网络的损失函数self.actor_loss的梯度,并使用其更新actor网络。
1. utils.soft_update(self.target_actor, self.actor, TAU) 2. utils.soft_update(self.target_critic, self.critic, TAU)
用在线策略网络和在线价值网络硬更新目标网络。
RDPG的提出,主要是解决智能体在POMDP环境中在无法得到全局状态的情况下怎样根据当前的观察以及之前的记忆经验来得到一个近似最优解的策略。由于在日常生活中,动作空间一般是连续的,而且状态空间也十分巨大,同时智能体也无法完全得到所有的环境信息,RDPG思想更切合实际,且比较符合大部分的生活场景。
虽然RDPG只是在一些模拟的环境(如OpenAI的gym库)中取得了较多的成果。但是这种利用之前存储的经验和当前的不完全观测,来推测完整环境状态的思想已经深入到目前很多研究的方向中。这种情况虽然更具挑战性,但是也更加贴合我们的实际生活。所以,在以后的研究工作中,在POMDP下的DRL任务会得到更多的关注。
人工智能领域的主要目标之一是从未经处理的高维度传感器输入中解决复杂任务。前文已经介绍过,通过将用于感知处理的深度神经网络和RL相结合,取得了重大的进展,产生了DQN算法。DQN能够在许多Atari视频游戏中使用未经处理的像素为输入,达到人类玩家的水准。
虽然DQN解决了高维观察空间的问题,但它只能处理离散和低维动作空间。许多有趣的任务,尤其是物理控制任务,具有连续(实际值)和高维度动作空间。DQN不能直接应用于连续域,因为它依赖于找到最大化动作值函数的动作,这在连续值的情况下需要在每个步骤进行迭代优化过程。
DQN的另一个缺点是,它采取随机的策略,也就是说在给定状态和参数的情况下,输出的动作服从一个概率分布,也就意味着每次走进这个状态的时候,输出的动作可能不同。这会导致行为有较多的变异性,我们的参数更新的方向很可能不是策略梯度的最优方向。与随机策略不同的是确定性策略,随机策略整合的是动作和状态空间,而确定性策略仅仅整合状态空间,也就是说,给定一个状态和参数,只输出一个确定的动作。为了探索完整的状态和动作空间,根据随机策略选择行动,而我们最终要学习的是一个确定的行为策略,在这个确定性策略梯度中,少了对动作的积分,多了回报函数对动作的导数。因此,其需要采样的数据少,算法效率高。
深度确定性策略梯度(Deep Deterministic Policy Gradien t,DDPG) [10] 中将动作策略的探索和动作策略的学习更新分开来。动作的探索仍然采用随机策略,而要学习的策略则是确定性策略。其次,其引入actor-critic框架,将策略网络与价值网络分开来。最后,其延续了DQN算法中的经验复用来对网络进行非策略训练,以最小化样本之间的相关性。它还使用目标Q网络训练网络,以在时间差异备份期间提供一致的目标。在DDPG算法中同时增加了批归一化(batch normalization)来防止梯度爆炸。
首先,我们介绍DDPG中的相关概念的定义。
· 确定性行为策略μ: 定义为一个函数,每一步的动作可以通过a t =μ(s t )计算获得。
· 策略网络: 用一个CNN对μ函数进行模拟,我们称这个网络为策略网络,其参数为θ μ 。
· 探索策略: 在智能体训练过程中,我们要兼顾探索和更新。探索是为了尽量探索到完整的动作状态空间。所以在训练过程中,引入了随机噪声,将动作的决策过程从确定性变为一个随机过程,再从这个随机过程中采样得到动作的值。过程如图2.8所示。
图2.8 DDPG中的动作选择
UO过程在时序上具备很好的相关性,可以使智能体很好地探索具备动量属性的环境。但是这个探索策略β不是我们最终要学习的动作策略。它仅仅用于训练过程,生成下达给环境的动作,从而获得我们想要的数据,然后利用这个数据去训练策略μ,以获得最优策略。
· Q网络: 我们用一个CNN对Q函数进行模拟,其参数为θ Q 。
· 衡量策略μ的表现: 使用一个函数J π (μ)来衡量当前学习到的策略的好坏:
其中,Q μ 是在每个状态下,智能体都按照μ策略选择动作,能够产生的Q值。J π (μ)也就是在状态s服从ρ π 分布时,按照策略μ,Q μ (s,μ(s))的期望。
· 训练的目标: 最大化J π (μ),同时最小化价值网络的损失。
· 最优行为策略μ: 就是使得J π (μ)最大的策略,
为了克服DQN中网络更新不稳定的缺点,DDPG分别为在线策略网络、在线价值网络各创建一个拷贝,叫作目标网络。每训练完一个小批量的数据后,在线策略网络和在线价值网络通过软更新算法来更新目标网络的参数:
这种逐步迭代更新求平均值的方法,保证了目标网络更新的稳定性。一般来说,τ取0.001。DDPG的网络框架如图2.9所示。
图2.9 DDPG网络结构图
首先在系统中要建立一个在线策略网络和一个在线价值网络,之后,按照相同的网络结构建造一个目标策略网络和一个目标价值网络,而在线网络和目标网络的参数初始化为相同。
在更新阶段,首先从经验复用池D中取得若干经验。先通过目标网络得到目标回报值y i 。具体步骤是,首先将下一状态向量s i+1 放入目标策略网络,得到动作a i+1 。之后,将目标动作和下一状态向量s i+1 连接在一起,共同作为目标价值网络的输入,得到目标值Q′,之后根据公式
得到目标回报值y i 。
之后,更新在线价值网络。首先,将s i 、a i 共同作为输入,放入在线价值网络,得到实际值Q。然后根据误差方程
得到在线价值网络的误差,通过最小化误差来更新该网络。
然后,更新在线策略网络。注意,由于DDPG是异步更新策略,也就是说动作的探索和策略网络的更新所采用的策略不是一种策略,动作的探索包含随机策略,而策略网络则是确定性更新。所谓确定性更新,就是说先把s
t
作为输入放入在线策略网络,得到确定性动作a。然后将向量s
t
和a连接在一起,共同作为在线价值网络的输入,求得策略梯度
。根据策略梯度来确在线策略网络的更新方向。
算法:DDPG
下面给出基于TensorFlow实现的DDPG算法的部分代码解析。
策略网络结构解析
1. def __init__(self, sess, action_dim, action_bound, learning_rate, replacement): 2. self.sess = sess 3. self.a_dim = action_dim 4. self.action_bound = action_bound 5. self.lr = learning_rate 6. self.t_replace_counter = 0 7. self.replacement = replacement 8. with tf.variable_scope('Actor'): 9. self.a = self._build_net(S, scope = ?eval_net?, trainable = True) 10. self.a_ = self._build_net(S_, scope = 'target_net', trainable = False) 11. self.e_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope = 'Actor/eval_net') 12. self.t_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope = 'Actor/target_net') 13. self.soft_replace = [tf.assign(t, (1 - self.replacement['tau']) * t + self.replacement['tau'] * e) for t, e in zip(self.t_params, self.e_params)]
该段代码对策略网络进行了初始化。需要初始化的为动作维度self.a_dim、会话self.sess、动作值限制self.action_bound、actor网络的学习率self.lr、更新参数字典self.replacement。
首先建立一个在线策略网络self.a,之后建立一个初始化参数相同且结构相同的目标策略网络self.a_。定义self.t_params为在线网络中的参数、self.e_params为目标网络中的参数。可以看出在线网络与目标网络之间的为软更新。软更新参数τ从self.replacement中获得。
1. def _build_net(self, s, scope, trainable): 2. with tf.variable_scope(scope): 3. init_w = tf.random_normal_initializer(0., 0.3) 4. init_b = tf.constant_initializer(0.1) 5. net = tf.layers.dense(s, 30, activation = tf.nn.relu, kernel_initialize r = init_w, bias_initializer = init_b, name = 'l1', trainable = trainable) 6. with tf.variable_scope('a'): 7. actions = tf.layers.dense(net, self.a_dim, activation = tf.nn.tanh, kernel_initializer = init_w, bias_initializer = init_b, name = 'a', trainable = trainable) 8. scaled_a = tf.multiply(actions, self.action_bound, name = 'scaled_a') 9. return scaled_a
上面这段代码的作用是搭建产生确定性动作的策略网络θ μ ,其输入是当前状态s t ,使用relu激活函数,最终的输出是在action_bound范围内的动作a t 。
具体来看,首先我们定义了全连接层的权重init_w和偏置init_b,之后,我们使用self.layers.dense定义了一个全连接层,该全连接层的输入是状态s,输出向量的大小为30(根据实际情况自定义),该层的激活函数为relu。在该层之后,又连接了一个全连接层,该全连接层的输出大小为self.a_dim,即对每一维的动作输出动作值。最后,使用tf.multiply函数对动作值的范围加以限制,输出最终合法的预测动作。
1. def add_grad_to_graph(self, a_grads): 2. with tf.variable_scope('policy_grads'): 3. self.policy_grads = tf.gradients(ys = self.a, xs = self.e_params, grad_ys = a_grads) 4. 5. with tf.variable_scope('A_train'): 6. opt = tf.train.AdamOptimizer(self.lr) # (- learning rate) for ascent policy 7. self.train_op = opt.apply_gradients(zip(self.policy_grads, self.e_params))
上面这段代码定义了策略梯度self.policy_grads以及策略网络的优化器self.train_op。在策略梯度函数中,ys的输入为在线更新策略网络θ
μ
,xs的输入为该策略网络的参数,grad_y的输入为价值网络在动作方向上的梯度
,策略梯度为
。
1. def choose_action(self, s): 2. s = s[np.newaxis, :] # single state 3. return self.sess.run(self.a, feed_dict = {S: s})[0] # single action
上面这段代码的作用是产生确定性动作,传入状态s,通过self.sess.run运行在线策略网络,产生预测的动作值。
1. def learn(self, s): 2. self.sess.run(self.train_op, feed_dict = {S: s}) 3. if self.replacement[?name?] == 'soft': 4. self.sess.run(self.soft_replace)
上面这段代码是策略网络的学习与目标网络的更新。self.train_op是对在线策略网络进行梯度更新,其需要的输入为当前状态s。而在前文已经定义了self.soft_replace的方式,使用在线网络,对目标网络进行迭代更新。
价值网络结构解析
1. def __init__(self, sess, state_dim, action_dim, learning_rate, gamma, replacement, a, a_): 2. self.sess = sess 3. self.s_dim = state_dim 4. self.a_dim = action_dim 5. self.lr = learning_rate 6. self.gamma = gamma 7. self.replacement = replacement 8. with tf.variable_scope('Critic'): 9. self.a = tf.stop_gradient(a) 10. self.q = self._build_net(S, self.a, 'eval_net', trainable = True) 11. self.q_ = self._build_net(S_, a_, 'target_net', trainable = False) 12. self.e_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope = 'Critic/eval_net') 13. self.t_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope = 'Critic/target_net')
上面这段代码对价值网络进行了初始化。需要初始化的参数为会话self.sess、状态维度self.s_dim、动作维度self.a_dim、critic网络的学习率self.lr、折扣因子self.gamma、软更新参数字典self.replacement。还需要传入动作a以及目标策略网络产生的动作a_。
为了防止价值网络的梯度更新影响到策略网络,使用tf.stop_gradient函数来阻止。self.q为在线价值网络对当前状态-动作的值估计。self.q_为目标critic网络的输出,其输入是由目标价值网络根据下一状态S_输出的动作a_、下一状态S_。
self.e_params为在线价值网络的参数θ
Q
,self.t_params为目标价值网络的网络参数
。
1. with tf.variable_scope('target_q'): 2. self.target_q = R + self.gamma * self.q_ 3. 4. with tf.variable_scope('TD_error'): 5. self.loss = tf.reduce_mean(tf.squared_difference(self.target_q, self.q)) 6. 7. with tf.variable_scope('C_train'): 8. self.train_op = tf.train.AdamOptimizer(self.lr).minimize(self.loss) 9. 10. with tf.variable_scope('a_grad'): 11. self.a_grads = tf.gradients(self.q, a)[0] # tensor of gradients of each sample (None, a_dim) 12. 13. self.soft_replacement = [tf.assign(t, (1 - self.replacement['tau']) * t + self.replacement['tau'] * e) for t, e in zip(self.t_params, self.e_params)]
上面这段代码仍然是在线价值网络的初始化,其分别定义了:
·目标值self.target_q,对应前文中的yi。该目标值使用当前回报R和目标价值网络的预测值来计算。
·在线价值网络的损失函数self.loss。使用目标价值网络输出的目标值self.target和在线价值网络输出的值估计self.q之间的差平方来估计。
·在线价值网络的优化器self.train_op。使用Adam优化器,学习率为self.lr,目标为最小化损失函数self.loss。
·在线价值网络在动作维度上的梯度self.a_grads(在策略网络的梯度计算中会用到),以及在线价值网络和目标价值网络之间的软更新。
1. def _build_net(self, s, a, scope, trainable): 2. with tf.variable_scope(scope): 3. init_w = tf.random_normal_initializer(0., 0.1) 4. init_b = tf.constant_initializer(0.1) 5. 6. with tf.variable_scope('l1'): 7. n_l1 = 30 8. w1_s = tf.get_variable('w1_s', [self.s_dim, n_l1], initializer = init_w, trainable = trainable) 9. w1_a = tf.get_variable('w1_a', [self.a_dim, n_l1], initializer = init_w, trainable = trainable) 10. b1 = tf.get_variable('b1', [1, n_l1], initializer = init_b, trainable = trainable) 11. net = tf.nn.relu(tf.matmul(s, w1_s) + tf.matmul(a, w1_a) + b1) 12. 13. with tf.variable_scope('q'): 14. q = tf.layers.dense(net, 1, kernel_initializer = init_w, bias_initializer = init_b, trainable = trainable) # Q(s, a) 15. return q
上面这段代码具体定义了价值网络的网络架构,其输入是动作和状态,输出是对于状态-动作对的值估计Q(s,a)。
具体来说,首先定义用于初始化的权重init_w和偏置init_b。之后n_l1代表第一层神经网络的输出维度。w1_s为状态向量对应的权重,w1_a为动作向量对应的权重,b1为第一层的偏置,net为第一层网络经过relu激活后的输出。
net后面连接了一个全连接层,其输出维度为1,代表对于每个状态-动作的值估计。
1. def learn(self, s, a, r, s_): 2. self.sess.run(self.train_op, feed_dict = {S: s, self.a: a, R: r, S_: s_}) 3. self.sess.run(self.soft_replacement) 4. self.t_replace_counter += 1
上面这段代码的作用是向图中传入对应的参数,进行在线价值网络的更新:
首先,传入了当前状态s、动作a、回报r和下一状态s_,得到价值网络的更新。
下一步,通过self.soft_replacement,使用在线价值网络,对目标价值网络进行软更新。
1. class Memory(object): 2. def __init__(self, capacity, dims): 3. self.capacity = capacity 4. self.data = np.zeros((capacity, dims)) 5. self.pointer = 0 6. 7. def store_transition(self, s, a, r, s_): 8. transition = np.hstack((s, a, [r], s_)) 9. index = self.pointer % self.capacity # replace the old memory with new memory 10. self.data[index, :] = transition 11. self.pointer += 1 12. 13. def sample(self, n): 14. assert self.pointer >= self.capacity, 'Memory has not been fulfilled' 15. indices = np.random.choice(self.capacity, size = n) 16. return self.data[indices, :]
以上是经验复用池的定义。
如上所示,首先定义了回放池所能存储的经验的多少self.capacity。self.data定义了具体存放经验的数组。self.pointer为当前存入经验池的位置,初始化为0。
store_transition函数具体实现了经验的存放。首先,使用np.hstack将4个向量(s、a、[r]、s_)合并为一个transition向量。之后,index为当前要存储的经验复用池的位置,之后经验池位置指针self.pointer后移一位。经验在经验池复用中是覆盖存储的。
sample方法为在经验复用池中采样,采样的大小为n。
和DQN算法或者单一的策略梯度更新算法相比,DDPG中在线网络和目标网络的使用,以及软更新算法的应用,使得学习的过程更加稳定,模型的收敛更加有保障。同时,DDPG可用于高维度动作状态空间,甚至是连续的空间,这使得DDPG算法更有可能应用于实际更加复杂的任务。同时,DDPG所使用的确定性策略所需要的样本数少,且不需要对动作空间进行积分,大大降低了算法的复杂度。
正是这些优良的特性,使得DDPG能够得到大家的认可并广泛应用于各式复杂的控制任务中。
信赖域策略优化(Trust Reign Policy Gradie nt,TRPO)属于策略梯度算法的一种。策略梯度算法在连续控制领域取得了一些成就,但是,其缺点也逐渐暴露出来:
·很难选择合适的更新步长。由于策略是不断改变的,所以输入数据是不固定的,也就导致了观察和回报的分布是变化的。所以,很难在变化中选择一个固定的步长,适应所有的情况。如果选择的步长过大,则很有可能更新到一个坏的策略,而下一批的观察数据也是在更新后的坏策略下得到的,这就可能使策略无法恢复,造成模型表现的崩塌。而如果选择的步长过小,可能需要上千万步迭代,令人绝望。
·数据采样效率低。一次采样出的数据只能用于更新策略一次。
所以,TRPO要解决的问题就是,如何选择合适的步长,找到新的策略使得新的回报函数的值增加,或者说单调不减。所谓信赖域,就是指在此区域内,我们认为函数的局部近似是准确的。TRPO算法的提出就用来解决这个问题。
为了解决上述问题,一个很自然的想法是,能不能把新策略下的预期回报表示为旧策略下的预期回报加上一个余项,让这个余项大于等于零,就能保证更新到一个更好的策略中了。令η(π)为在策略π下的累积预期回报:
令π′为新策略,π为旧策略,则新策略的累积预期回报可以表示为:
代表动作从新策略分布π′(·|s
t
)中采样得到。其中
A π (s,a)叫作优势函数,用来评价当前动作值相对于平均值的大小。如果A π >0,说明该动作比平均动作好。
将上式进一步展开得:
进一步变形,得到:
其中,ρ π 为折扣访问频率:
注意这里的状态是由新策略产生的,对于新策略有很强的依赖性,但是在计算的时候,新策略是不知道的。所以,在TRPO中,使用旧策略代替新策略,因为两者差距不大。
上面的等式证明了对于任意一对策略更新(π→π′),如果对于每一个状态s,都有一个非负的优势期望,就可以保证提高策略表现η。特殊地,如果优势期望处处得零,则策略表现不变。此等式也表示如何判断一个策略已经达到了最优策略:如对于一个确定性策略更新网络,π′(s)=argmax a A π (s,a),如果有至少一对状态-动作的优势函数值为0并且状态访问概率不为0,则可以改进策略;否则,该策略就已经收敛到了最优策略。
TRPO引入了重要性采样处理动作分布,提高了数据采样效率。所谓重要性采样,就是使得和环境互动的策略与要更新的策略不是同一个策略,这样,通过和环境互动获得的采样数据就能够被多次应用在新策略的更新中:
L π (π′)与η(π)的唯一区别是状态分布不同,事实上L π (π′)是η(π)的一阶近似。因此,在旧策略附近,能够改善L的策略也能改善η。下一步的核心问题是,更新的步长怎么取。为此,引入一个非常重要的不等式:
其中,
,而
为每个状态下动作分布的散度的最大值。为了使问题简化,使用平均散度代替最大散度。最终,问题等价为希望找到参数θ满足:
其中,θ为新策略对应的参数,θ old 为旧策略对应的参数。
在TRPO的论文中,仅给出了一个简短的面对离散策略的情况,如下所示:
算法:TRPO
下面给出基于PyTorch实现的TRPO算法的部分代码解析。
1)策略网络结构解析
1. class Policy(nn.Module): 2. def __init__(self, num_inputs, num_outputs): 3. super(Policy, self).__init__() 4. self.affine1 = nn.Linear(num_inputs, 64) 5. self.affine2 = nn.Linear(64, 64) 6. self.action_mean = nn.Linear(64, num_outputs) 7. self.action_mean.weight.data.mul_(0.1) 8. self.action_mean.bias.data.mul_(0.0) 9. self.action_log_std = nn.Parameter(torch.zeros(1, num_outputs)) 10. self.saved_actions = [] 11. self.rewards = [] 12. self.final_value = 0
该段代码定义了策略网络。要传入的参数是策略网络输入维数(状态的维数)和策略网络输出维数(动作的维数)。nn.Linear代表线性层,self.affine1为线性输入层,self.affine2为中间层,self.action_mean为通过神经网络输出的动作值的平均值,该层的权重初始化为0.1,偏置初始化为0。self.action_log_std为动作的标准差的对数值。由于动作是连续的,通过动作的均值和方差,我们就能从该分布中选取动作。
1. def forward(self, x): 2. x = torch.tanh(self.affine1(x)) 3. x = torch.tanh(self.affine2(x)) 4. 5. action_mean = self.action_mean(x) 6. action_log_std = self.action_log_std.expand_as(action_mean) 7. action_std = torch.exp(action_log_std) 8. 9. return action_mean, action_log_std, action_std
随后,在该策略模型中,要定义前向传导函数:输入的x为状态,通过神经层后,要经过一个tanh激活函数,求得action_mean和action_log_std,通过求指数,将对数还原为真正的动作的标准差,这是在写策略网络中的一个小技巧。
2)价值网络结构解析
1. class Value(nn.Module): 2. def __init__(self, num_inputs): 3. super(Value, self).__init__() 4. self.affine1 = nn.Linear(num_inputs, 64) 5. self.affine2 = nn.Linear(64, 64) 6. self.value_head = nn.Linear(64, 1) 7. self.value_head.weight.data.mul_(0.1) 8. self.value_head.bias.data.mul_(0.0) 9. 10. def forward(self, x): 11. x = torch.tanh(self.affine1(x)) 12. x = torch.tanh(self.affine2(x)) 13. 14. state_values = self.value_head(x) 15. return state_values
和策略网络类似,价值网络也是由一个输入层、一个中间层和一个输出层组成的。可以看出,对于每一个状态,只输出一个价值。输出层的权重初始化为0.1,偏置初始化为0。
在前向传导函数中,输入状态x首先经过self.affine1层,然后经过tanh激活函数,之后经过self.affine2层,经过tanh激活,最终,通过self.value_head输出层,输出state_values。
3)更新解析
1. def update_params(batch): 2. rewards = torch.Tensor(batch.reward) 3. masks = torch.Tensor(batch.mask) 4. actions = torch.Tensor(np.concatenate(batch.action, 0)) 5. states = torch.Tensor(batch.state) 6. values = value_net(Variable(states)) 7. 8. returns = torch.Tensor(actions.size(0), 1) 9. deltas = torch.Tensor(actions.size(0), 1) 10. advantages = torch.Tensor(actions.size(0), 1) 11. 12. prev_return = 0 13. prev_value = 0 14. prev_advantage = 0
更新函数的输入是更新样本批量,批量中含有奖励rewards、结束标志masks、动作actions、状态states。传入后,为了将其加入动态图中,要将其变为torch.Tensor。向价值网络中传入批量状态,得到这些状态的预估价值values。
在更新中,还要求得的变量为累积回报returns、优势advantages等,所以需要提前声明。
1. for i in reversed(range(rewards.size(0))): 2. returns[i] = rewards[i] + args.gamma * prev_return * masks[i] 3. deltas[i] = rewards[i] + args.gamma * prev_value * masks[i] - values.data[i] 4. advantages[i] = deltas[i] + args.gamma * args.tau * prev_advantage * masks[i] 5. prev_return = returns[i, 0] 6. prev_value = values.data[i, 0] 7. prev_advantage = advantages[i, 0] 8. targets = Variable(returns)
首先,从后往前求得折扣回报。特殊地,如果下一步状态为完成,则当前的回报为当前的奖励,否则,再加上下一步的回报乘一个折扣因子。
对于优势函数来说,也是从后往前求,只不过,此处的优势包含后面时间步的优势的折扣累积。
1. flat_params, _, opt_info = scipy.optimize.fmin_l_bfgs_b(get_value_loss, get_flat_params_from(value_net).double().numpy(), maxiter = 25) 2. set_flat_params_to(value_net, torch.Tensor(flat_params))
上面两步用于更新价值网络,get_value_loss函数的作用根据target计算得到价值网络的损失,传入scipy库中的optimize.fmin_l_bfgs_b最小化优化函数,计算得到网络更新后的参数,再通过set_flat_params_to函数进行更新模型。上面用到的三个函数定义如下:
1. def set_flat_params_to(model, flat_params): 2. prev_ind = 0 3. for param in model.parameters(): 4. flat_size = int(np.prod(list(param.size()))) 5. param.data.copy_( 6. flat_params[prev_ind:prev_ind + flat_size].view(param.size())) 7. prev_ind += flat_size
set_flat_params_to函数用于将flat_params中存储的参数值复制到model里面。其流程是,对于model中的所有参数,首先,通过param.size获得参数的形状,然后使用np.prod将各维的大小相乘,得到“变平”后的向量的长度flat_size。params.data.copy_为原地复制操作,直接改变了params中的值。view的功能就是改变张量的形状,可见,其将flat_params中param的对应字段取出,改为参数在模型中的原始形状,然后赋值给模型中的对应参数。
1. def get_flat_params_from(model): 2. params = [] 3. for param in model.parameters(): 4. params.append(param.data.view(-1)) 5. flat_params = torch.cat(params) 6. return flat_params
get_flat_params_from函数是将model中的参数取出,然后“变平”。其中view(-1)的作用就是把param.data从一个高维的张量展平成一维。使用torch.cat将一个列表变为一个张量,并返回。
1. def get_value_loss(flat_params): 2. set_flat_params_to(value_net, torch.Tensor(flat_params)) 3. for param in value_net.parameters(): 4. if param.grad is not None: 5. param.grad.data.fill_(0)
get_value_loss函数定义在更新模块update_params内部,用于计算价值网络的损失。首先,由于传递模型十分浪费时间和占用资源,所以我们只传递模型中的参数,然后把模型中的参数返回给value_net。此处循环的目的是在计算之前要将所有的梯度赋值成0。
1. values_ = value_net(Variable(states)) 2. value_loss = (values_ - targets).pow(2).mean() 3. for param in value_net.parameters(): 4. value_loss += param.pow(2).sum() * args.l2_reg
values_为通过价值网络对状态进行的值估计,那么就可以用预估值和目标值之间的差距来衡量这个价值网络的损失。最后,为了减少过拟合,再在损失中加上正则项,即模型中参数的L2范数。
1. value_loss.backward() 2. return (value_loss.data.double().numpy(), get_flat_grad_from(value_net).data. double().numpy())
最后,使用PyTorch自带的反向传导函数计算梯度,并返回。
1. advantages = (advantages - advantages.mean()) / advantages.std() 2. action_means, action_log_stds, action_stds = policy_net(Variable(states)) 3. fixed_log_prob = normal_log_density(Variable(actions), action_means, action_log_stds, action_stds).data.clone()
上段代码开始,就在更新策略网络。首先,将优势归一化(即减去方差后除以标准差)。之后,要求动作分布(假设动作服从的是正态分布,即概率密度
)。把状态放入策略网络中,得到动作分布的均值action_means、标准差的对数action_log_stds、标准差action_stds,将其放入narmal_log_density,求得该正态分布的对数值,即log(p)。
1. def normal_log_density(x, mean, log_std, std): 2. var = std.pow(2) 3. log_density = -(x - mean).pow(2) / ( 4. 2 * var) - 0.5 * math.log(2 * math.pi) - log_std 5. return log_density.sum(1, keepdim = True)
上面这个函数用于求动作的正态分布的对数值,把动作均值、动作标准差代入密度函数方程即可。
1. trpo_step(policy_net, get_loss, get_kl, args.max_kl, args.damping)
对策略网络进行更新,首先对其中要用到的几个函数进行讲解:
·get loss:求策略网络的损失,对应公式L
Policy
=
。
·get_kl:求得两个分布在行动上的散度距离。
1. def trpo_step(model, get_loss, get_kl, max_kl, damping): 2. loss = get_loss() 3. grads = torch.autograd.grad(loss, model.parameters()) 4. loss_grad = torch.cat([grad.view(-1) for grad in grads]).data
首先,求得策略网络的损失loss,之后,求loss对策略网络参数的导数grads,并将其展开成一维的loss_grad。
1. stepdir = conjugate_gradients(Fvp, -loss_grad, 10) 2. 3. shs = 0.5 * (stepdir * Fvp(stepdir)).sum(0, keepdim = True) 4. 5. lm = torch.sqrt(shs / max_kl) 6. fullstep = stepdir / lm[0] 7. 8. neggdotstepdir = (-loss_grad * stepdir).sum(0, keepdim = True)
计算共轭梯度stepdir,其中Fvp为求共轭梯度中需要用到的一个函数,具体的写法请见代码。
1. prev_params = get_flat_params_from(model) 2. success, new_params = linesearch(model, get_loss, prev_params, fullstep, neggdotstepdir / lm[0])
得到model中的参数pre_params,使用线搜索进行梯度下降,使得不论从哪一点开始,都能稳定收敛到一个局部最优解。具体写法请见代码。
1. set_flat_params_to(model, new_params) 2. return loss
将更新的参数赋值给策略网络。至此,两个网络的更新完毕。
策略梯度方法的缺点是数据效率和鲁棒性不好。同时TRPO方法又比较复杂,且不兼容dropout(在深度神经网络训练过程中按照一定概率对网络单元进行丢弃)和参数共享(策略和价值网络间)。后面研究者提出了近端策略优化算法,它是对TRPO算法的改进,更易于实现,且数据效率更高。TRPO方法中通过使用约束而非惩罚项来保证策略更新的稳定性,主要原因是作为惩罚项的话会引入权重因子,而这个参数难以调节。所以,虽然TRPO把信赖域引入了RL,但是由于实现的高复杂性,一般不推荐使用TRPO,而是使用和TRPO方向一致,但是更简单高效的近端策略优化算法。
在RL的发展中,涌现出了很多优秀算法,在一些领域取得了良好的效果,比较显著的有电脑游戏、围棋、三维运动。然而,各类算法的缺点也被研究者们逐渐发现。Q-learning算法在很多简单的问题中都不能被应用。策略梯度算法虽然在有一定难度的问题中取得了一些成效,但是这类方法对于迭代步骤的数量非常敏感:如果选的太小,训练的过程就会令人绝望;如果选的太大,反馈信号就会淹没在噪声中,甚至有可能使训练模型呈现雪崩式的下降。这类方法的采样效率也非常低,学习简单的任务就需要百万级甚至以上的总迭代次数。
为了解决这些问题,研究人员找到了TRPO算法。这种方法对于策略更新的大小做出了限制。虽然达到了提高鲁棒性和提高采样效率的目的,但是也在其他方面付出了代价。TRPO算法虽然在连续控制方面取得了很好的效果,但是其实现相对较为复杂,并且对策略函数和价值函数或者辅助损失之间有共享参数的算法难以兼容,比如Atari和其他一些视觉输出占据主要部分的任务。
近端策略优化算法(Proximal Policy Optimiza tion,PPO)的提出旨在借鉴TRPO算法,使用一阶优化,在采样效率、算法表现,以及实现和调试的复杂度之间取得了新的平衡。这是因为PPO会在每一次迭代中尝试计算新的策略,让损失函数最小化,并且保证每一次新计算出的策略能够和原策略相差不大。目前,OpenAI已经把PPO作为自己RL研究中的首选算法。
图2.10中的机器人就是使用PPO算法进行训练,他要学会走路、跑步、转弯来接近一个随即移动的球形目标,环境中还有一个白色小球会撞击机器人,给机器人的学习增加难度。所以他还要学会保持平衡,甚至在被小球撞击后主动站起来。
图2.10 gym训练环境
策略梯度算法通过计算一个策略梯度估计,然后使用随机梯度下降算法,最终得到一个好的策略。最广泛应用的梯度估计有如下形式:
其中,π
θ
为一个随机策略函数,
为在t时刻对于优势函数的估计。所谓优势函数,就是当前策略与环境互动所获得的评分与基准之间的差距。如果
>0,说明当前“状态-动作”对能够得到比基准更高的回报,反之,获得的回报更低。式(2.52)中,
代表着对于一批样本的期望值。使用这种传统on-policy,即与环境互动的策略和需要更新的策略是一个策略,它的缺点是采样效率低,采样出的数据只能用于更新一次策略。所以,我们要引入一个另外的策略q和环境进行互动,互动出来的经验样本可以用于多次更新策略p。我们有如下的公式推导:
可以看出来,我们使用从策略q中采样的样本,就能用于更新策略p。但是,其中存在一定的问题,即当p与q分布相近时,得到的期望才能近似相等,否则,可能会产生很大的差别,比如图2.11中这种情况:
图2.11 重要性采样
假设采样不足,p分布的采样集中在x轴负半轴,q分布的采样集中在x轴正半轴。在p分布中,E
x~p
[f(x)]<0,而在q分布中,
,两者之间就会差距很大。所以,要限制p和q策略之间不能相差过大。
在PPO中,使用旧策略π与环境做交互,交互采集的样本用于更新策略π′,θ为新策略的网络参数,因为一次交互产生的样本可以用于连续多次更新,所以PPO算法提高了采样效率。在TRPO中,使用一个硬约束对使得π′与π相差不要过远。而在PPO中,使用一个一阶优化对其进行约束,原问题转化成了:
KL为一个散度计算函数,计算的是相同的行为在两个不同的分布之间的距离。PPO的提出者通过实验也发现了简单选择一个固定的惩罚因子β不能解决问题,要对β做一些改进。如果KL[π,π′]>KL max ,增大惩罚因子β;如果KL[π,π′]<KL min ,减小β。
由于散度的计算不易理解,也较为困难,PPO的提出者又提出了一种截断的方法,来限制新策略的更新。其损失函数定义为:
其中,r
t
(θ)=
,ε为一个超参,一般建议取0.2。这个截断的思想也很好理解,当
>0时,说明当前的行动产生的回报估计要大于基准行动的预期回报,所以我们更新策略π′,要让该行动出现的概率越大越好,但是要给这个加一个限制,即不能高于原策略的1+ε倍。同理,若
<0,说明当前的行动产生的回报估计要小于基准行动的预期回报,所以我们要让该行动出现的概率越小越好,但是不能小于原策略的1-ε倍。
PPO在actor-critic结构中的算法,其伪代码如下所示。
算法:PPO,actor-critic结构
下面给出基于TensorFlow实现的PPO算法的部分代码解析。
1)策略网络结构解析
1. def _build_anet(self, name, trainable): 2. with tf.variable_scope(name): 3. l1 = tf.layers.dense(self.tfs, 100, tf.nn.relu, trainable = trainable 4. mu = 2 * tf.layers.dense(l1, A_DIM, tf.nn.tanh, trainable = trainabl 5. sigma = tf.layers.dense(l1, A_DIM, tf.nn.softplus, trainable = trainable) 6. norm_dist = tf.distributions.Normal(loc = mu, scale = sigma) 7. params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope = name) 8. return norm_dist, params
该段代码定义了策略网络。要传入的参数是scope的名字name,以及一个布尔标志trainable。这是由于旧策略只用于和环境交互产生样本,在更新策略的时候,旧策略不能被更新。该策略网络的输入为状态self.tfs,A_DIM为动作的维数。tf.distributions.Normal定义了一个均值为mu、标准差为sigma的正态分布。params为该网络中的参数。该方法返回的是一个正态分布,以及该策略网络的参数。
1. # actor 2. pi, pi_params = self._build_anet('pi', trainable = True) 3. oldpi, oldpi_params = self._build_anet('oldpi', trainable = False) 4. with tf.variable_scope('sample_action'): 5. self.sample_op = tf.squeeze(pi.sample(1), axis = 0) # choosing action 6. with tf.variable_scope('update_oldpi'): 7. self.update_oldpi_op = [oldp.assign(p) for p, oldp in zip(pi_params, oldpi_params)]
如上代码定义actor,pi代表要更新的策略,oldpi代表与环境交互产生样本的旧策略,oldpi网络中的参数不更新,所以trainable参数为False。self.sample_op为从在线策略网络中选择动作与环境进行交互。self.update_oldpi_op为迭代旧策略。
1. self.tfa = tf.placeholder(tf.float32, [None, A_DIM], 'action') 2. self.tfadv = tf.placeholder(tf.float32, [None, 1], 'advantage') 3. with tf.variable_scope('loss'): 4. with tf.variable_scope(?surrogate?): 5. # ratio = tf.exp(pi.log_prob(self.tfa) - oldpi.log_prob(self.tfa)) 6. ratio = pi.prob(self.tfa) / oldpi.prob(self.tfa) 7. surr = ratio * self.tfadv
上面是在actor中对于r
t
(θ)的定义,ratio对应上文中的r
t
(θ),是新策略与旧策略的比值。self.tfa是对于动作的占位符,里面应该填上采样的动作。self.tfadv为实际计算出来的优势
的值的占位符。
1. if METHOD["name"] == "kl_pen": 2. self.tflam = tf.placeholder(tf.float32, None, 'lambda') 3. kl = tf.distributions.kl_divergence(oldpi, pi) 4. self.kl_mean = tf.reduce_mean(kl) 5. self.aloss = -(tf.reduce_mean(surr - self.tflam * kl))
上面代码定义的是策略网络的一种损失计算方式。第一种为散度惩罚方法。self.tflam为上文中的惩罚因子β。kl为计算的新旧策略之间的散度。
1. else: # clipping method, find this is better 2. self.aloss = -tf.reduce_mean(tf.minimum( surr, tf.clip_by_value(ratio, 1.+METHOD['epsilon'], 1.+METHOD['epsilon'])*self.tfadv))
第二种为截断方法。对应上文中的L CLIP (θ)。
2)价值网络结构解析
1. # critic 2. with tf.variable_scope('critic'): 3. l1 = tf.layers.dense(self.tfs, 100, tf.nn.relu) 4. self.v = tf.layers.dense(l1, 1) 5. self.tfdc_r = tf.placeholder(tf.float32, [None, 1], 'discounted_r') 6. self.advantage = self.tfdc_r - self.v 7. self.closs = tf.reduce_mean(tf.square(self.advantage)) 8. self.ctrain_op = tf.train.AdamOptimizer(C_LR).minimize(self.closs)
由于一个PPO类中只需要一个价值网络,所以可以将网络直接定义在类的初始化函数中。价值网络的输入是状态self.tfs,中间经过一层全连接层(按照环境情况自己设定),输出为对该状态的价值估计self.v。self.tfdc_r定义了baseline,是一个占位符,里面要放入预计累积回报。而self.advantage为优势函数,记录了当前策略与baseline策略之间的回报差距,对应
。self.ctrain_op定义了critic网络的训练优化器,推荐采用adam优化器。
3)策略更新函数解析
1. def update(self, s, a, r): 2. self.sess.run(self.update_oldpi_op) 3. adv = self.sess.run(self.advantage, {self.tfs: s, self.tfdc_r: r})
首先,迭代旧策略。之后,向占位符self.tfs中传入状态s、向占位符self.tfdc_r中传入累积回报r,计算得到优势adv。
1. if METHOD['name'] == 'kl_pen': 2. for _ in range(A_UPDATE_STEPS): 3. _, kl = self.sess.run( 4. [self.atrain_op, self.kl_mean], 5. {self.tfs: s, self.tfa: a, self.tfadv: adv, self.tflam: METHOD['lam']}) 6. if kl > 4*METHOD['kl_target']: # this in in google's paper 7. break 8. if kl < METHOD['kl_target'] / 1.5: # adaptive lambda, this is in OpenAI's paper 9. METHOD['lam'] /= 2 10. elif kl > METHOD['kl_target'] * 1.5: 11. METHOD['lam'] *= 2
如果使用散度惩罚方法,在每次更新的时候,要传入状态s、动作a、上一步得到的优势adv以及惩罚因子METHOD['lam'],在每次更新的同时,要计算散度kl。如果kl小于标准['kl_target']/1.5,将惩罚因子METHOD['lam']除以2;如果kl大于标准['kl_target']*1.5,将惩罚因子METHOD['lam']乘以2。动态调整惩罚因子,使得新策略和旧策略之间的分布相差不多。
1. else: # clipping method, find this is better (OpenAI's paper) 2. [self.sess.run(self.atrain_op, {self.tfs: s, self.tfa: a, self.tfadv: adv}) for _ in range(A_UPDATE_STEPS)]
如果使用截断的方法,传入状态s、动作a、优势adv,对策略更新多次。
PPO算法现在已经是OpenAI用于连续控制的首选算法。它易于实现且改进了onpolicy梯度更新的一些问题,提高了采样的效率以及算法的鲁棒性。
OpenAI的研究人员设计了具有互动能力的机器人,然后用PPO训练它们的策略。在这些基于Roboschool环境的实验中,可以用键盘给机器人设定新的目标位置;尽管输入的目标序列与用来训练机器人的序列不同,机器人仍然可以进行泛化。
除了Roboschool中这样的简单机器人,他们还用PPO教会复杂的仿真机器人走路,比如来自波士顿动力的Atlas的仿真模型。相比前面的双足机器人的17个独立关节,这个模型中独立关节的数目高达30个。也有一些其他研究人员已经成功借助PPO训练仿真机器人用精彩的跑酷动作跨越障碍。