这段内容基于Paddle框架实现了Advantage Actor-Critic(A2C)算法,以CartPole-v0环境为例,涵盖环境配置、状态与动作空间获取,定义Actor和Critic网络,计算TD目标,还展示了含详细输出的训练过程,包括状态处理、动作采样、奖励收集及参数更新等步骤。
☞☞☞AI 智能聊天, 问答助手, AI 智能搜索, 免费无限量使用 DeepSeek R1 模型☜☜☜

Advantage Actor-Critci
Advantage Actor-Critci是带基线的策略梯度方法,其中Advantage代表的是优势函数。可以把Advantage Actor-Critci看做Actor-Critci的改进版本。二者区别如下:
Actor-Critci
Advantage Actor-Critci
1.配置环境,导入需要的包
In [1]
import gymimport osimport sysfrom itertools import countimport paddlefrom paddle.distribution import Categorical
In [2]
print('python version',sys.version)print('paddle version',paddle.__version__)
python version 3.7.4 (default, Aug 13 2019, 20:35:49) [GCC 7.3.0]paddle version 2.2.1
2.搭建Advantage Actor-Critci 网络
2.1 获取当前运行设备(CPU or GPU)
In [3]
device=paddle.get_device()print(device)
cpu
2.2 加载摆车环境
In [4]
env=gym.make('CartPole-v0')print(env)
<TimeLimit<CartPoleEnv>>
2.3 获取状态空间大小与动作空间大小
关于 observation_space 与 action_space 代表的到底是什么内容,可以参考CSDN博文:https://wuxian.blog.csdn.net/article/details/89576003
目前我的理解:
observation_space 代表的是观测到的状态,也就是【摆车在轨道上的位置,杆子与竖直方向的夹角,小车速度,角度变化率】action_space 代表的是动作空间的大小,【左移,右移】In [5]
state_=env.observation_spaceprint(state_)state_size=state_.shape[0]print(state_size)action_=env.action_spaceprint(action_)action_size=action_.nprint(action_size)
Box(4,)4Discrete(2)2
2.4 定义学习率
In [6]
lr=0.001
2.5 定义Actor网络(实际定义的网络并没有严格按照图示,多了一个抽样)
截图来自 王树森 老师的书籍《深度强化学习》,Github直达链接:https://github.com/wangshusen/DRL
立即进入“豆包AI人工智官网入口”;
立即学习“豆包AI人工智能在线问答入口”;
**如果s是张量,那么使用卷积网络捕获特征向量;如果s是向量,那么使用全连接网络。**这句话摘抄于上述书籍中,个人认为其所述张量应为:大于1维的张量;注意区分标量,向量,矩阵,张量,个人理解如下:
标量是仅包含一个数字,也是0维的张量向量是数字组成的数组,也是1维的张量矩阵是向量组成的数组,也是2维的张量张量是深度学习的主要数据载体。标量,向量,矩阵是张量的一些特殊情况的常用名。
在(初中)高中数学中我们也学过向量,比如计算投影,这时向量是有方向的,但是在深度学习中,我理解的方向与此是有点区别的,即不用考虑方向。
In [7]
class Actor(paddle.nn.Layer): def __init__(self,state_size,action_size): super(Actor,self).__init__() self.state_size=state_size self.action_size=action_size self.l1=paddle.nn.Linear(self.state_size,128) self.l2=paddle.nn.Linear(128,256) self.l3=paddle.nn.Linear(256,self.action_size) self.relu=paddle.nn.ReLU() def forward(self,state): out=self.relu(self.l1(state)) out=self.relu(self.l2(out)) out=self.l3(out) # 根据动作的概率(加和为1),生成一个类别分布.paddle框架目前提供了三种分类函数【类别分类,正态分布,均匀分布】 distribution=Categorical(paddle.nn.functional.softmax(out,axis=-1)) return distribution
2.6 定义Critic网络
In [8]
class Critic(paddle.nn.Layer): def __init__(self,state_size,action_size): super(Critic,self).__init__() self.state_size=state_size self.action_size=action_size self.l1=paddle.nn.Linear(self.state_size,128) self.l2=paddle.nn.Linear(128,256) self.l3=paddle.nn.Linear(256,1) self.relu=paddle.nn.ReLU() def forward(self,state): # paddle.nn.ReLU() 与paddle.nn.functional.relu()的区别是:前者是面向对象,是class,在类的fordward中调用了后者;后者是面向过程,是def。 #out=paddle.nn.functional.relu(self.l1(state)) # relu(x)=max(0,x) #out=paddle.nn.functional.relu(self.l2(out)) out=self.relu(self.l1(state)) out=self.relu(self.l2(out)) value=self.l3(out) return value
3. 训练模型
3.1 定义模型存储路径
In [9]
actor_path='model/actor.pdparams'critic_path='model/critic.pdparams'
3.2 TD目标的计算
看可以看王树森的书,但是感觉不太一样,理论与实践存在一些差别,但是总体思想是一致的。
In [10]
def compute_returns(next_value,rewards,masks,gamma=0.99): # 相当于n+1时刻的价值 R=next_value returns=[] # masks=[1,1,1,1,1,...,1,1,1,0] # rewards=[r0,r1,r2,r3,r4,...,rn-3,rn-2,rn-1,rn] # 倒序 for step in reversed(range(len(rewards))): # TD目标的计算?? R=rewards[step]+gamma*R*masks[step] returns.insert(0,R) # returns=[t0,t1,t2,t3,t4,...,tn-3,tn-2,tn-1,tn] return returns
3.3 训练过程(一)
带有很多输出,方便了解每一行代码在做什么事情。 3.3与3.4内容是一样的,只不过3.3用来理解每一步的输出;3.4用来训练,保证输出的简洁。
In [11]
def trainIters1(actor,critic,n_iters): # 定义两个网络的优化器 optA=paddle.optimizer.Adam(lr,parameters=actor.parameters()) optC=paddle.optimizer.Adam(lr,parameters=critic.parameters()) for iter in range(n_iters): state=env.reset() # 环境初始化 state形如[ 0.04700963 -0.0149178 0.01601383 0.03912796] print(state) log_probs=[] # 对数概率密度函数 rewards=[] # 奖励列表 values=[] # 价值,critic网络的输出 masks=[] # done or not done entropy=[] # 交叉熵 env.reset() # 这里为什么又要重置环境?? # python的迭代器 count(初值=0,步长=1) for i in count(): # env.render() print('*************************** /n/n/n') print('i = ',i) # state转为paddle.tensor,不指定place,根据环境自行判断 state=paddle.to_tensor(state,dtype='float32') print(state) dist,value=actor(state),critic(state) print('actor distribution:',dist) # 这是一个分类分布(class) print('critic value:',value) # 从分布中进行采样(具体如何采样尚且不知,根据概率进行抽样??) action=dist.sample([1]) print('action is:',action,type(action)) # 环境执行一个时间步长,得到下一个状态,奖励,done,以及info(用不到,这里返回是空的字典) # env.step()的输入格式是什么?: # 将tensor拷贝到CPU上,不懂意图是什么,如果decive是在GPU上,则会有作用。 print('action.cpu()=',action.cpu()) # 删除axis=0上尺度为1的维度 print('action.cpu().squeeze(0)=',action.cpu().squeeze()) # 获取数值 print('action.cpu().squeeze(0).numpy()=',action.cpu().squeeze().numpy(),type(action.cpu().squeeze().numpy())) next_state,reward,done,info=env.step(action.cpu().squeeze(0).numpy()) print('next_state=',next_state) print('reward=',reward) print('done=',done) print('info=',info,type(info)) # 计算当前动作的对数概率密度函数 log_prob=dist.log_prob(action) print('log_prob = ',log_prob) # 加入到对数概率密度函数列表中 log_probs.append(log_prob) # 将价值(critic)网络的输出加入到价值列表中 values.append(value) # 将当前奖励加入到奖励列表中 rewards.append(paddle.to_tensor([reward],dtype='float32')) # 将当前的done标志加入到masks列表中,False=0, masks.append(paddle.to_tensor([1-done],dtype='float32')) print('1-done = ',1-done) state=next_state if done: if iter%10==0: # score 就是小车坚持了多少个时间步 print("Iteration:{},score:{}".format(iter,i)) break break #如果想要查看这个for循环每一步的输入,打开这个break # end for count() 小车运行一次,也就是进行一句游戏 print() next_state=paddle.to_tensor(next_state,dtype='float32') # 让critic网络根据next_state预测next_value next_value=critic(next_state) print('next_value = ',next_value) returns=compute_returns(next_value,rewards,masks) print('returns == ',returns) print('len returns = ',len(returns)) log_probs=paddle.concat(log_probs) # detach() 返回一个新的Tensor,从当前计算图分离。 # 作用是什么??? returns=paddle.concat(returns).detach() values=paddle.concat(values) print('concat log_probs: ',log_probs) print('concat returns: ',returns) print('concat values: ',values) # 负的 TD 误差 ,是让values接近returns,TD误差是 values-returns advantage=returns-values # actor的loss计算 # critic的loss计算 actor_loss=-(log_probs*advantage.detach()).mean() critic_loss=advantage.pow(2).mean() print('actor_loss: ',actor_loss) print('critic_loss: ',critic_loss) # 更新参数,完成一句游戏更新一次 # 梯度反向传播,注意actor网络是做梯度上升;critic网络是做梯度下降 optA.clear_grad() optC.clear_grad() actor_loss.backward() critic_loss.backward() optA.step() optC.step() break # 看一个iter的输出,注意打开这个break paddle.save(actor.state_dict(),actor_path) paddle.save(critic.state_dict(),critic_path) env.close print("111111111111111-------overover****************************************")
3.4 训练过程(二)
将很多输出注释掉,输出界面的简洁。3.3与3.4内容是一样的,只不过3.3用来理解每一步的输出;3.4用来训练,保证输出的简洁。
In [12]
def trainIters2(actor,critic,n_iters): # 定义两个网络的优化器 optA=paddle.optimizer.Adam(lr,parameters=actor.parameters()) optC=paddle.optimizer.Adam(lr,parameters=critic.parameters()) for iter in range(n_iters): state=env.reset() # 环境初始化 state形如[ 0.04700963 -0.0149178 0.01601383 0.03912796] #print(state) log_probs=[] # 对数概率密度函数 rewards=[] # 奖励列表 values=[] # 价值,critic网络的输出 masks=[] # done or not done entropy=[] # 交叉熵 env.reset() # 这里为什么又要重置环境?? # python的迭代器 count(初值=0,步长=1) for i in count(): # env.render() # 在线运行好像不支持 #print('*************************** /n/n/n') #print('i = ',i) # state转为paddle.tensor,不指定place,根据环境自行判断 state=paddle.to_tensor(state,dtype='float32') #print(state) dist,value=actor(state),critic(state) #print('actor distribution:',dist) # 这是一个分类分布(class) #print('critic value:',value) # 从分布中进行采样(具体如何采样尚且不知,根据概率进行抽样??) action=dist.sample([1]) #print('action is:',action,type(action)) # 环境执行一个时间步长,得到下一个状态,奖励,done,以及info(用不到,这里返回是空的字典) # env.step()的输入格式是什么?: # 将tensor拷贝到CPU上,不懂意图是什么,如果decive是在GPU上,则会有作用。 #print('action.cpu()=',action.cpu()) # 删除axis=0上尺度为1的维度 #print('action.cpu().squeeze(0)=',action.cpu().squeeze()) # 获取数值 #print('action.cpu().squeeze(0).numpy()=',action.cpu().squeeze().numpy(),type(action.cpu().squeeze().numpy())) next_state,reward,done,info=env.step(action.cpu().squeeze(0).numpy()) # print('next_state=',next_state) # print('reward=',reward) # print('done=',done) # print('info=',info,type(info)) # 计算当前动作的对数概率密度函数 log_prob=dist.log_prob(action) #print('log_prob = ',log_prob) # 加入到对数概率密度函数列表中 log_probs.append(log_prob) # 将价值(critic)网络的输出加入到价值列表中 values.append(value) # 将当前奖励加入到奖励列表中 rewards.append(paddle.to_tensor([reward],dtype='float32')) # 将当前的done标志加入到masks列表中,False=0, masks.append(paddle.to_tensor([1-done],dtype='float32')) #print('1-done = ',1-done) state=next_state if done: if iter%10==0: # score 就是小车坚持了多少个时间步 print("Iteration:{},score:{}".format(iter,i)) break # break #如果想要查看这个for循环每一步的输入,打开这个注释 # end for count() 小车运行一次,也就是进行一句游戏 #print() next_state=paddle.to_tensor(next_state,dtype='float32') # 让critic网络根据next_state预测next_value next_value=critic(next_state) #print('next_value = ',next_value) returns=compute_returns(next_value,rewards,masks) #print('returns == ',returns) #print('len returns = ',len(returns)) log_probs=paddle.concat(log_probs) # detach() 返回一个新的Tensor,从当前计算图分离。 # 作用是什么??? returns=paddle.concat(returns).detach() values=paddle.concat(values) # print('concat log_probs: ',log_probs) # print('concat returns: ',returns) # print('concat values: ',values) # 负的 TD 误差 ,是让values接近returns,TD误差是 values-returns advantage=returns-values # actor的loss计算 # critic的loss计算 actor_loss=-(log_probs*advantage.detach()).mean() critic_loss=advantage.pow(2).mean() # 更新参数,完成一句游戏更新一次 # 梯度反向传播,注意actor网络是做梯度上升;critic网络是做梯度下降 optA.clear_grad() optC.clear_grad() actor_loss.backward() critic_loss.backward() optA.step() optC.step() # break paddle.save(actor.state_dict(),actor_path) paddle.save(critic.state_dict(),critic_path) env.close print("222222222222222-------overover****************************************")
3.5 主函数,从文件中加载网络或是从零开始。可自行修改。
In [13]
def main(): actor=Actor(state_size,action_size) critic=Critic(state_size,action_size) # if os.path.exists(actor_path): # amodel_state_dict=paddle.load(actor_path) # actor.set_state_dict(amodel_state_dict) # print("load actor model from file") # if os.path.exists(critic_path): # cmodel_state_dict=paddle.load(critic_path) # critic.set_state_dict(cmodel_state_dict) # print("load critic model from file") # 在这里决定是用3.3 还是 用3.4 trainIters1(actor,critic,n_iters=200) #trainIters2(actor,critic,n_iters=200)main()
[-0.01245204 -0.0177881 -0.0447371 0.01200596]*************************** /n/n/ni = 0Tensor(shape=[4], dtype=float32, place=CPUPlace, stop_gradient=True, [-0.01245204, -0.01778810, -0.04473710, 0.01200596])actor distribution: critic value: Tensor(shape=[1], dtype=float32, place=CPUPlace, stop_gradient=False, [0.00308495])action is: Tensor(shape=[1], dtype=int64, place=CPUPlace, stop_gradient=False, [0]) action.cpu()= Tensor(shape=[1], dtype=int64, place=CPUPlace, stop_gradient=False, [0])action.cpu().squeeze(0)= Tensor(shape=[], dtype=int64, place=CPUPlace, stop_gradient=False, 0)action.cpu().squeeze(0).numpy()= 0 next_state= [-0.0036292 -0.2412104 -0.02740825 0.25513875]reward= 1.0done= Falseinfo= {} log_prob = Tensor(shape=[1], dtype=float32, place=CPUPlace, stop_gradient=False, [-0.69534212])1-done = 1next_value = Tensor(shape=[1], dtype=float32, place=CPUPlace, stop_gradient=False, [-0.02449672])returns == [Tensor(shape=[1], dtype=float32, place=CPUPlace, stop_gradient=False, [0.97574824])]len returns = 1concat log_probs: Tensor(shape=[1], dtype=float32, place=CPUPlace, stop_gradient=False, [-0.69534212])concat returns: Tensor(shape=[1], dtype=float32, place=CPUPlace, stop_gradient=True, [0.97574824])concat values: Tensor(shape=[1], dtype=float32, place=CPUPlace, stop_gradient=False, [0.00308495])actor_loss: Tensor(shape=[1], dtype=float32, place=CPUPlace, stop_gradient=False, [0.67633373])critic_loss: Tensor(shape=[1], dtype=float32, place=CPUPlace, stop_gradient=False, [0.94607389])111111111111111-------overover****************************************
以上就是【强化】Advantage Actor-Critic (A2C):强化学习之摆车的详细内容,更多请关注创想鸟其它相关文章!
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/47230.html
微信扫一扫
支付宝扫一扫