标题
- 引言
- 强化学习基础
- 强化学习的核心要素
- Q-Learning算法
- Q-Learning原理
- SARSA算法
- 深度Q网络(Deep Q Network, DQN)
- DQN架构
- 改进的DQN算法
- Double DQN
- 策略梯度方法
- REINFORCE算法
- Actor-Critic方法
- 比较不同算法
- 实战项目:CartPole环境
- 高级主题:多智能体强化学习
- 总结
- 实践建议
引言
强化学习(Reinforcement Learning, RL)是机器学习的一个重要分支,专注于智能体(Agent)如何通过与环境(Environment)的交互来学习最优策略。从游戏AI到机器人控制,从推荐系统到自动驾驶,强化学习正在改变我们与智能系统的交互方式。本文将深入探讨强化学习的核心概念,从传统的Q-Learning算法到现代的深度强化学习方法。
强化学习基础
强化学习的核心要素
强化学习系统由以下几个关键组件组成:
- 智能体(Agent):学习并做出决策的实体
- 环境(Environment):智能体交互的外部世界
- 状态(State):环境的当前情况
- 动作(Action):智能体可以执行的操作
- 奖励(Reward):执行动作后获得的反馈
- 策略(Policy):智能体的决策规则
importnumpyasnpimportmatplotlib.pyplotaspltfromcollectionsimportdefaultdict,dequeimportrandomclassEnvironment:"""强化学习环境基类"""def__init__(self):self.state=Noneself.action_space=Noneself.observation_space=Nonedefreset(self):"""重置环境到初始状态"""raiseNotImplementedErrordefstep(self,action):"""执行动作,返回(next_state, reward, done, info)"""raiseNotImplementedErrordefrender(self):"""可视化环境"""raiseNotImplementedError# 示例:简单的网格世界环境classGridWorld(Environment):def__init__(self,width=5,height=5):super().__init__()self.width=width self.height=height self.action_space=4# 上、下、左、右self.observation_space=width*height# 定义特殊位置self.start_pos=(0,0)self.goal_pos=(width-1,height-1)self.obstacles=[(2,2),(3,2)]self.reset()defreset(self):self.agent_pos=self.start_pos self.steps=0returnself._get_state()def_get_state(self):"""将位置转换为状态编号"""returnself.agent_pos[0]*self.height+self.agent_pos[1]defstep(self,action):self.steps+=1# 保存旧位置old_pos=self.agent_pos# 执行动作ifaction==0:# 上new_pos=(self.agent_pos[0],max(0,self.agent_pos[1]-1))elifaction==1:# 下new_pos=(self.agent_pos[0],min(self.height-1,self.agent_pos[1]+1))elifaction==2:# 左new_pos=(max(0,self.agent_pos[0]-1),self.agent_pos[1])elifaction==3:# 右new_pos=(min(self.width-1,self.agent_pos[0]+1),self.agent_pos[1])else:new_pos=self.agent_pos# 检查是否碰到障碍物ifnew_posnotinself.obstacles:self.agent_pos=new_pos# 计算奖励ifself.agent_pos==self.goal_pos:reward=10# 到达目标done=Trueelifself.agent_pos==old_pos:# 撞墙或障碍物reward=-1done=Falseelse:reward=-0.1# 每步的小惩罚,鼓励尽快到达目标done=False# 限制最大步数ifself.steps>=100:done=Truereturnself._get_state(),reward,done,{}defrender(self):"""打印网格世界"""grid=np.full((self.height,self.width),'.',dtype=str)grid[self.goal_pos[1],self.goal_pos[0]]='G'forobsinself.obstacles:grid[obs[1],obs[0]]='#'grid[self.agent_pos[1],self.agent_pos[0]]='A'print("\n".join(" ".join(row)forrowingrid))print()# 创建环境并测试env=GridWorld(5,5)state=env.reset()print(f"初始状态:{state}")env.render()foriinrange(10):action=random.randint(0,3)state,reward,done,info=env.step(action)print(f"步骤{i+1}: 动作={action}, 奖励={reward}, 状态={state}, 完成={done}")env.render()ifdone:print("任务完成!")breakQ-Learning算法
Q-Learning是最经典的强化学习算法之一,通过学习动作价值函数Q来找到最优策略。
Q-Learning原理
Q-Learning使用贝尔曼方程来更新Q值:
Q ( s , a ) = Q ( s , a ) + α [ r + γ max a ′ Q ( s ′ , a ′ ) − Q ( s , a ) ] Q(s,a) = Q(s,a) + \alpha[r + \gamma \max_{a'} Q(s',a') - Q(s,a)]Q(s,a)=Q(s,a)+α[r+γa′maxQ(s′,a′)−Q(s,a)]
其中:
- α \alphaα是学习率
- γ \gammaγ是折扣因子
- r rr是即时奖励
- s ′ s's′是下一个状态
classQLearningAgent:def__init__(self,state_space,action_space,learning_rate=0.1,discount_factor=0.95,epsilon=0.1):self.state_space=state_space self.action_space=action_space self.learning_rate=learning_rate self.discount_factor=discount_factor self.epsilon=epsilon# Q表:状态-动作值函数self.q_table=np.zeros((state_space,action_space))# 统计信息self.episode_rewards=[]defchoose_action(self,state,training=True):"""使用epsilon-贪心策略选择动作"""iftrainingandrandom.random()<self.epsilon:# 探索:随机选择动作returnrandom.randint(0,self.action_space-1)else:# 利用:选择Q值最高的动作returnnp.argmax(self.q_table[state])defupdate(self,state,action,reward,next_state):"""更新Q表"""# 获取当前Q值current_q=self.q_table[state,action]# 计算最大Q值(用于下一状态)max_next_q=np.max(self.q_table[next_state])# 贝尔曼方程更新new_q=current_q+self.learning_rate*(reward+self.discount_factor*max_next_q-current_q)self.q_table[state,action]=new_qdefdecay_epsilon(self,decay_rate=0.995,min_epsilon=0.01):"""衰减探索率"""self.epsilon=max(min_epsilon,self.epsilon*decay_rate)deftrain(self,env,episodes=1000,max_steps=100):"""训练智能体"""forepisodeinrange(episodes):state=env.reset()total_reward=0forstepinrange(max_steps):# 选择动作action=self.choose_action(state)# 执行动作next_state,reward,done,_=env.step(action)# 更新Q表self.update(state,action,reward,next_state)# 更新状态state=next_state total_reward+=rewardifdone:break# 记录总奖励self.episode_rewards.append(total_reward)# 衰减epsilonself.decay_epsilon()# 打印进度ifepisode%100==0:avg_reward=np.mean(self.episode_rewards[-100:])print(f"Episode{episode}, Average Reward (last 100):{avg_reward:.2f}, Epsilon:{self.epsilon:.3f}")deftest(self,env,episodes=10):"""测试训练好的智能体"""total_rewards=[]forepisodeinrange(episodes):state=env.reset()total_reward=0done=Falsesteps=0whilenotdoneandsteps<100:# 选择动作(不探索)action=self.choose_action(state,training=False)state,reward,done,_=env.step(action)total_reward+=reward steps+=1total_rewards.append(total_reward)print(f"Test Episode{episode+1}: Reward ={total_reward}, Steps ={steps}")print(f"Average Test Reward:{np.mean(total_rewards):.2f}")# 训练Q-Learning智能体env=GridWorld(5,5)agent=QLearningAgent(state_space=env.observation_space,action_space=env.action_space,learning_rate=0.1,discount_factor=0.95,epsilon=0.1)print("开始训练Q-Learning智能体...")agent.train(env,episodes=1000)# 测试训练结果print("\n测试训练好的智能体:")agent.test(env,episodes=5)# 可视化学习曲线plt.figure(figsize=(12,4))plt.subplot(1,2,1)plt.plot(agent.episode_rewards)plt.title("Episode Rewards")plt.xlabel("Episode")plt.ylabel("Total Reward")plt.subplot(1,2,2)# 计算移动平均window=100moving_avg=[np.mean(agent.episode_rewards[i:i+window])foriinrange(len(agent.episode_rewards)-window)]plt.plot(moving_avg)plt.title(f"Moving Average Reward (window={window})")plt.xlabel("Episode")plt.ylabel("Average Reward")plt.show()SARSA算法
SARSA是另一种价值迭代算法,与Q-Learning不同,它使用实际执行的下一个动作来更新Q值。
classSARSAAgent:def__init__(self,state_space,action_space,learning_rate=0.1,discount_factor=0.95,epsilon=0.1):self.state_space=state_space self.action_space=action_space self.learning_rate=learning_rate self.discount_factor=discount_factor self.epsilon=epsilon self.q_table=np.zeros((state_space,action_space))self.episode_rewards=[]defchoose_action(self,state,training=True):"""选择动作"""iftrainingandrandom.random()<self.epsilon:returnrandom.randint(0,self.action_space-1)else:returnnp.argmax(self.q_table[state])defupdate(self,state,action,reward,next_state,next_action):"""SARSA更新规则"""current_q=self.q_table[state,action]next_q=self.q_table[next_state,next_action]new_q=current_q+self.learning_rate*(reward+self.discount_factor*next_q-current_q)self.q_table[state,action]=new_qdeftrain(self,env,episodes=1000,max_steps=100):"""训练SARSA智能体"""forepisodeinrange(episodes):state=env.reset()action=self.choose_action(state)total_reward=0forstepinrange(max_steps):# 执行动作next_state,reward,done,_=env.step(action)# 选择下一个动作next_action=self.choose_action(next_state)# 更新Q表self.update(state,action,reward,next_state,next_action)# 更新状态和动作state=next_state action=next_action total_reward+=rewardifdone:breakself.episode_rewards.append(total_reward)self.epsilon=max(0.01,self.epsilon*0.995)ifepisode%100==0:avg_reward=np.mean(self.episode_rewards[-100:])print(f"SARSA Episode{episode}, Average Reward:{avg_reward:.2f}")# 比较Q-Learning和SARSAprint("\n比较Q-Learning和SARSA:")# 训练SARSA智能体sarsa_agent=SARSAAgent(state_space=env.observation_space,action_space=env.action_space,learning_rate=0.1,discount_factor=0.95,epsilon=0.1)print("训练SARSA智能体...")sarsa_agent.train(env,episodes=1000)# 绘制比较图plt.figure(figsize=(10,5))window=100# Q-Learning移动平均q_avg=[np.mean(agent.episode_rewards[i:i+window])foriinrange(len(agent.episode_rewards)-window)]plt.plot(q_avg,label='Q-Learning')# SARSA移动平均sarsa_avg=[np.mean(sarsa_agent.episode_rewards[i:i+window])foriinrange(len(sarsa_agent.episode_rewards)-window)]plt.plot(sarsa_avg,label='SARSA')plt.title("Q-Learning vs SARSA Performance")plt.xlabel("Episode")plt.ylabel(f"Average Reward (window={window})")plt.legend()plt.show()深度Q网络(Deep Q Network, DQN)
当状态空间很大时,传统的Q表方法变得不可行。深度Q网络使用神经网络来近似Q函数。
DQN架构
importtorchimporttorch.nnasnnimporttorch.optimasoptimimporttorch.nn.functionalasFclassDQN(nn.Module):"""深度Q网络"""def__init__(self,state_size,action_size,hidden_size=64):super(DQN,self).__init__()self.fc1=nn.Linear(state_size,hidden_size)self.fc2=nn.Linear(hidden_size,hidden_size)self.fc3=nn.Linear(hidden_size,action_size)defforward(self,x):x=F.relu(self.fc1(x))x=F.relu(self.fc2(x))returnself.fc3(x)classReplayBuffer:"""经验回放缓冲区"""def__init__(self,capacity):self.buffer=deque(maxlen=capacity)defpush(self,state,action,reward,next_state,done):self.buffer.append((state,action,reward,next_state,done))defsample(self,batch_size):returnrandom.sample(self.buffer,batch_size)def__len__(self):returnlen(self.buffer)classDQNAgent:def__init__(self,state_size,action_size,learning_rate=0.001):self.state_size=state_size self.action_size=action_size self.device=torch.device("cuda"iftorch.cuda.is_available()else"cpu")# 主网络和目标网络self.q_network=DQN(state_size,action_size).to(self.device)self.target_network=DQN(state_size,action_size).to(self.device)# 复制主网络的权重到目标网络self.update_target_network()# 优化器self.optimizer=optim.Adam(self.q_network.parameters(),lr=learning_rate)# 经验回放self.memory=ReplayBuffer(10000)# 超参数self.epsilon=1.0self.epsilon_min=0.01self.epsilon_decay=0.995self.gamma=0.95self.batch_size=64self.target_update_freq=100self.episode_rewards=[]defupdate_target_network(self):"""更新目标网络"""self.target_network.load_state_dict(self.q_network.state_dict())defone_hot_encode(self,state):"""将状态编号转换为one-hot编码"""state_vector=np.zeros(self.state_size)state_vector[state]=1returnstate_vectordefchoose_action(self,state,training=True):"""选择动作"""iftrainingandrandom.random()<self.epsilon:returnrandom.randint(0,self.action_size-1)else:state_vector=self.one_hot_encode(state)state_tensor=torch.FloatTensor(state_vector).unsqueeze(0).to(self.device)q_values=self.q_network(state_tensor)returnq_values.argmax().item()defremember(self,state,action,reward,next_state,done):"""存储经验"""self.memory.push(state,action,reward,next_state,done)defreplay(self):"""经验回放训练"""iflen(self.memory)<self.batch_size:return# 采样批次batch=self.memory.sample(self.batch_size)states,actions,rewards,next_states,dones=zip(*batch)# 转换为tensorstates=torch.FloatTensor([self.one_hot_encode(s)forsinstates]).to(self.device)actions=torch.LongTensor(actions).to(self.device)rewards=torch.FloatTensor(rewards).to(self.device)next_states=torch.FloatTensor([self.one_hot_encode(s)forsinnext_states]).to(self.device)dones=torch.BoolTensor(dones).to(self.device)# 计算当前Q值current_q_values=self.q_network(states).gather(1,actions.unsqueeze(1))# 计算目标Q值next_q_values=self.target_network(next_states).max(1)[0].detach()target_q_values=rewards+(self.gamma*next_q_values*~dones)# 计算损失loss=F.mse_loss(current_q_values.squeeze(),target_q_values)# 反向传播self.optimizer.zero_grad()loss.backward()self.optimizer.step()deftrain(self,env,episodes=1000):"""训练DQN智能体"""forepisodeinrange(episodes):state=env.reset()total_reward=0steps=0whilesteps<200:# 限制每集最大步数# 选择动作action=self.choose_action(state)# 执行动作next_state,reward,done,_=env.step(action)# 存储经验self.remember(state,action,reward,next_state,done)# 更新状态state=next_state total_reward+=reward steps+=1# 经验回放self.replay()ifdone:break# 更新目标网络ifepisode%self.target_update_freq==0:self.update_target_network()# 衰减epsilonself.epsilon=max(self.epsilon_min,self.epsilon*self.epsilon_decay)# 记录奖励self.episode_rewards.append(total_reward)ifepisode%100==0:avg_reward=np.mean(self.episode_rewards[-100:])print(f"DQN Episode{episode}, Average Reward:{avg_reward:.2f}, Epsilon:{self.epsilon:.3f}")# 训练DQN智能体dqn_agent=DQNAgent(state_size=env.observation_space,action_space=env.action_space,learning_rate=0.001)print("\n开始训练DQN智能体...")dqn_agent.train(env,episodes=1000)# 可视化DQN性能plt.figure(figsize=(10,5))window=100dqn_avg=[np.mean(dqn_agent.episode_rewards[i:i+window])foriinrange(len(dqn_agent.episode_rewards)-window)]plt.plot(dqn_avg)plt.title("DQN Learning Curve")plt.xlabel("Episode")plt.ylabel(f"Average Reward (window={window})")plt.show()改进的DQN算法
Double DQN
classDoubleDQNAgent(DQNAgent):"""Double DQN解决过估计问题"""def__init__(self,state_size,action_size,learning_rate=0.001):super().__init__(state_size,action_size,learning_rate)defreplay(self):"""Double DQN经验回放"""iflen(self.memory)<self.batch_size:returnbatch=self.memory.sample(self.batch_size)states,actions,rewards,next_states,dones=zip(*batch)states=torch.FloatTensor([self.one_hot_encode(s)forsinstates]).to(self.device)actions=torch.LongTensor(actions).to(self.device)rewards=torch.FloatTensor(rewards).to(self.device)next_states=torch.FloatTensor([self.one_hot_encode(s)forsinnext_states]).to(self.device)dones=torch.BoolTensor(dones).to(self.device)# 当前Q值current_q_values=self.q_network(states).gather(1,actions.unsqueeze(1))# Double DQN计算目标Q值# 使用主网络选择动作,使用目标网络评估next_actions=self.q_network(next_states).argmax(1)[0]next_q_values=self.target_network(next_states).gather(1,next_actions.unsqueeze(1)).squeeze()target_q_values=rewards+(self.gamma*next_q_values*~dones)loss=F.mse_loss(current_q_values.squeeze(),target_q_values)self.optimizer.zero_grad()loss.backward()self.optimizer.step()策略梯度方法
REINFORCE算法
REINFORCE是最简单的策略梯度算法,直接优化策略函数。
classPolicyNetwork(nn.Module):"""策略网络"""def__init__(self,state_size,action_size,hidden_size=128):super(PolicyNetwork,self).__init__()self.fc1=nn.Linear(state_size,hidden_size)self.fc2=nn.Linear(hidden_size,hidden_size)self.fc3=nn.Linear(hidden_size,action_size)defforward(self,x):x=F.relu(self.fc1(x))x=F.relu(self.fc2(x))returnF.softmax(self.fc3(x),dim=-1)classREINFORCEAgent:def__init__(self,state_size,action_size,learning_rate=0.01,gamma=0.99):self.state_size=state_size self.action_size=action_size self.device=torch.device("cuda"iftorch.cuda.is_available()else"cpu")# 策略网络self.policy_net=PolicyNetwork(state_size,action_size).to(self.device)self.optimizer=optim.Adam(self.policy_net.parameters(),lr=learning_rate)self.gamma=gamma self.episode_rewards=[]self.saved_log_probs=[]self.rewards=[]defone_hot_encode(self,state):"""one-hot编码"""state_vector=np.zeros(self.state_size)state_vector[state]=1returnstate_vectordefchoose_action(self,state):"""根据策略选择动作"""state_vector=self.one_hot_encode(state)state_tensor=torch.FloatTensor(state_vector).to(self.device)probs=self.policy_net(state_tensor)# 从概率分布中采样动作action_dist=torch.distributions.Categorical(probs)action=action_dist.sample()# 保存对数概率用于训练self.saved_log_probs.append(action_dist.log_prob(action))returnaction.item()defupdate_policy(self):"""更新策略"""R=0policy_loss=[]returns=[]# 计算折扣回报forrinself.rewards[::-1]:R=r+self.gamma*R returns.insert(0,R)# 标准化回报returns=torch.tensor(returns)returns=(returns-returns.mean())/(returns.std()+1e-8)# 计算策略损失forlog_prob,Rinzip(self.saved_log_probs,returns):policy_loss.append(-log_prob*R)# 更新策略self.optimizer.zero_grad()policy_loss=torch.stack(policy_loss).sum()policy_loss.backward()self.optimizer.step()# 清空缓存self.saved_log_probs=[]self.rewards=[]deftrain(self,env,episodes=1000):"""训练REINFORCE智能体"""forepisodeinrange(episodes):state=env.reset()total_reward=0done=Falsewhilenotdone:# 选择动作action=self.choose_action(state)# 执行动作state,reward,done,_=env.step(action)# 记录奖励self.rewards.append(reward)total_reward+=reward# 更新策略self.update_policy()# 记录总奖励self.episode_rewards.append(total_reward)ifepisode%100==0:avg_reward=np.mean(self.episode_rewards[-100:])print(f"REINFORCE Episode{episode}, Average Reward:{avg_reward:.2f}")# 训练REINFORCE智能体reinforce_agent=REINFORCEAgent(state_size=env.observation_space,action_size=env.action_space,learning_rate=0.01,gamma=0.99)print("\n开始训练REINFORCE智能体...")reinforce_agent.train(env,episodes=1000)Actor-Critic方法
Actor-Critic结合了价值方法和策略方法的优点。
classActorCritic(nn.Module):"""Actor-Critic网络"""def__init__(self,state_size,action_size,hidden_size=128):super(ActorCritic,self).__init__()self.shared=nn.Linear(state_size,hidden_size)# Actor头(策略)self.actor=nn.Linear(hidden_size,action_size)# Critic头(价值)self.critic=nn.Linear(hidden_size,1)defforward(self,x):x=F.relu(self.shared(x))policy=F.softmax(self.actor(x),dim=-1)value=self.critic(x)returnpolicy,valueclassActorCriticAgent:def__init__(self,state_size,action_size,learning_rate=0.01,gamma=0.99):self.state_size=state_size self.action_size=action_size self.device=torch.device("cuda"iftorch.cuda.is_available()else"cpu")# Actor-Critic网络self.ac_net=ActorCritic(state_size,action_size).to(self.device)self.optimizer=optim.Adam(self.ac_net.parameters(),lr=learning_rate)self.gamma=gamma self.episode_rewards=[]self.log_probs=[]self.values=[]self.rewards=[]defone_hot_encode(self,state):state_vector=np.zeros(self.state_size)state_vector[state]=1returnstate_vectordefchoose_action(self,state):state_vector=self.one_hot_encode(state)state_tensor=torch.FloatTensor(state_vector).to(self.device)policy,value=self.ac_net(state_tensor)# 采样动作action_dist=torch.distributions.Categorical(policy)action=action_dist.sample()# 保存用于训练self.log_probs.append(action_dist.log_prob(action))self.values.append(value)returnaction.item()defupdate(self):"""更新Actor-Critic网络"""# 计算回报returns=[]R=0forrinself.rewards[::-1]:R=r+self.gamma*R returns.insert(0,R)returns=torch.tensor(returns).float()# 计算优势values=torch.cat(self.values).squeeze()advantages=returns-values# 计算Actor损失(策略梯度)actor_loss=[]forlog_prob,advantageinzip(self.log_probs,advantages):actor_loss.append(-log_prob*advantage)# 计算Critic损失(价值函数)critic_loss=F.mse_loss(values,returns)# 总损失loss=torch.stack(actor_loss).sum()+critic_loss# 更新网络self.optimizer.zero_grad()loss.backward()self.optimizer.step()# 清空缓存self.log_probs=[]self.values=[]self.rewards=[]deftrain(self,env,episodes=1000):"""训练Actor-Critic智能体"""forepisodeinrange(episodes):state=env.reset()total_reward=0done=Falsewhilenotdone:# 选择动作action=self.choose_action(state)# 执行动作state,reward,done,_=env.step(action)# 记录self.rewards.append(reward)total_reward+=reward# 更新网络self.update()# 记录奖励self.episode_rewards.append(total_reward)ifepisode%100==0:avg_reward=np.mean(self.episode_rewards[-100:])print(f"Actor-Critic Episode{episode}, Average Reward:{avg_reward:.2f}")# 训练Actor-Critic智能体ac_agent=ActorCriticAgent(state_size=env.observation_space,action_size=env.action_space,learning_rate=0.01,gamma=0.99)print("\n开始训练Actor-Critic智能体...")ac_agent.train(env,episodes=1000)比较不同算法
# 比较所有算法的性能plt.figure(figsize=(15,8))# 计算所有算法的移动平均window=50# Q-Learningiflen(agent.episode_rewards)>window:q_avg=[np.mean(agent.episode_rewards[i:i+window])foriinrange(len(agent.episode_rewards)-window)]plt.plot(q_avg,label='Q-Learning',alpha=0.7)# SARSAiflen(sarsa_agent.episode_rewards)>window:sarsa_avg=[np.mean(sarsa_agent.episode_rewards[i:i+window])foriinrange(len(sarsa_agent.episode_rewards)-window)]plt.plot(sarsa_avg,label='SARSA',alpha=0.7)# DQNiflen(dqn_agent.episode_rewards)>window:dqn_avg=[np.mean(dqn_agent.episode_rewards[i:i+window])foriinrange(len(dqn_agent.episode_rewards)-window)]plt.plot(dqn_avg,label='DQN',alpha=0.7)# REINFORCEiflen(reinforce_agent.episode_rewards)>window:reinforce_avg=[np.mean(reinforce_agent.episode_rewards[i:i+window])foriinrange(len(reinforce_agent.episode_rewards)-window)]plt.plot(reinforce_avg,label='REINFORCE',alpha=0.7)# Actor-Criticiflen(ac_agent.episode_rewards)>window:ac_avg=[np.mean(ac_agent.episode_rewards[i:i+window])foriinrange(len(ac_agent.episode_rewards)-window)]plt.plot(ac_avg,label='Actor-Critic',alpha=0.7)plt.title("Comparison of Different RL Algorithms")plt.xlabel("Episode")plt.ylabel(f"Average Reward (window={window})")plt.legend()plt.grid(True,alpha=0.3)plt.show()实战项目:CartPole环境
让我们使用经典的CartPole环境来测试我们的算法。
importgymclassCartPoleAgent:def__init__(self,algorithm='dqn'):self.env=gym.make('CartPole-v1')self.state_size=self.env.observation_space.shape[0]self.action_size=self.env.action_space.n self.algorithm=algorithm# 根据算法创建智能体ifalgorithm=='dqn':self.agent=DQNAgent(self.state_size,self.action_size)self.agent.epsilon=0.1# 降低探索率elifalgorithm=='reinforce':self.agent=REINFORCEAgent(self.state_size,self.action_size)elifalgorithm=='actor-critic':self.agent=ActorCriticAgent(self.state_size,self.action_size)deftrain(self,episodes=500):"""训练智能体"""print(f"Training{self.algorithm}agent on CartPole...")forepisodeinrange(episodes):state=self.env.reset()total_reward=0done=Falsewhilenotdone:ifself.algorithm=='dqn':action=self.agent.choose_action(state)next_state,reward,done,_=self.env.step(action)self.agent.remember(state,action,reward,next_state,done)self.agent.replay()else:action=self.agent.choose_action(state)next_state,reward,done,_=self.env.step(action)self.agent.rewards.append(reward)state=next_state total_reward+=reward# 对于策略梯度方法,更新策略ifself.algorithm!='dqn':ifself.algorithm=='reinforce':self.agent.update_policy()else:self.agent.update()self.agent.episode_rewards.append(total_reward)ifepisode%50==0:avg_reward=np.mean(self.agent.episode_rewards[-50:])print(f"Episode{episode}, Average Reward:{avg_reward:.2f}")# 如果平均奖励超过195,认为任务解决ifavg_reward>=195:print(f"CartPole solved in{episode}episodes!")breakdeftest(self,episodes=10):"""测试训练好的智能体"""print(f"\nTesting{self.algorithm}agent...")total_rewards=[]forepisodeinrange(episodes):state=self.env.reset()total_reward=0done=Falsewhilenotdone:ifself.algorithm=='dqn':action=self.agent.choose_action(state,training=False)else:action=self.agent.choose_action(state)state,reward,done,_=self.env.step(action)total_reward+=reward total_rewards.append(total_reward)print(f"Test Episode{episode+1}: Reward ={total_reward}")print(f"Average Test Reward:{np.mean(total_rewards):.2f}")# 测试不同算法algorithms=['dqn','reinforce','actor-critic']results={}foralgoinalgorithms:cartpole=CartPoleAgent(algo)cartpole.train(episodes=500)cartpole.test()results[algo]=cartpole.agent.episode_rewards# 可视化结果plt.figure(figsize=(12,5))foralgo,rewardsinresults.items():window=50iflen(rewards)>window:avg=[np.mean(rewards[i:i+window])foriinrange(len(rewards)-window)]plt.plot(avg,label=algo)plt.title("CartPole: Algorithm Comparison")plt.xlabel("Episode")plt.ylabel(f"Average Reward (window={50})")plt.axhline(y=195,color='r',linestyle='--',label='Solved Threshold')plt.legend()plt.show()高级主题:多智能体强化学习
classMultiAgentEnvironment:"""多智能体环境示例"""def__init__(self,num_agents=2,grid_size=5):self.num_agents=num_agents self.grid_size=grid_size self.agents_pos=[(0,i)foriinrange(num_agents)]self.goal_pos=(grid_size-1,grid_size-1)self.obstacles=[(2,2),(3,2)]defreset(self):self.agents_pos=[(0,i)foriinrange(self.num_agents)]return[self._get_agent_state(i)foriinrange(self.num_agents)]def_get_agent_state(self,agent_id):"""获取单个智能体的状态"""x,y=self.agents_pos[agent_id]returnx*self.grid_size+ydefstep(self,actions):"""执行所有智能体的动作"""rewards=[]dones=[]next_states=[]foriinrange(self.num_agents):old_pos=self.agents_pos[i]action=actions[i]# 执行动作ifaction==0:# 上new_pos=(self.agents_pos[i][0],max(0,self.agents_pos[i][1]-1))elifaction==1:# 下new_pos=(self.agents_pos[i][0],min(self.grid_size-1,self.agents_pos[i][1]+1))elifaction==2:# 左new_pos=(max(0,self.agents_pos[i][0]-1),self.agents_pos[i][1])elifaction==3:# 右new_pos=(min(self.grid_size-1,self.agents_pos[i][0]+1),self.agents_pos[i][1])else:new_pos=self.agents_pos[i]# 检查碰撞ifnew_posnotinself.obstaclesandnew_posnotinself.agents_pos[:i]+self.agents_pos[i+1:]:self.agents_pos[i]=new_pos# 计算奖励ifself.agents_pos[i]==self.goal_pos:reward=10done=Trueelifself.agents_pos[i]==old_pos:reward=-1done=Falseelse:# 团队奖励:靠近其他智能体min_dist=min([abs(self.agents_pos[i][0]-ap[0])+abs(self.agents_pos[i][1]-ap[1])forj,apinenumerate(self.agents_pos)ifi!=j])reward=0.1*(4-min_dist)# 越近奖励越高done=Falserewards.append(reward)dones.append(done)next_states.append(self._get_agent_state(i))returnnext_states,rewards,dones# 多智能体训练classMultiAgentTrainer:def__init__(self,env,num_agents):self.env=env self.num_agents=num_agents self.agents=[]# 为每个智能体创建独立的DQNfor_inrange(num_agents):agent=DQNAgent(env.grid_size*env.grid_size,4)agent.epsilon=0.1self.agents.append(agent)deftrain(self,episodes=1000):"""训练多智能体系统"""forepisodeinrange(episodes):states=self.env.reset()total_rewards=[0]*self.num_agents done=Falsewhilenotdone:actions=[]foriinrange(self.num_agents):action=self.agents[i].choose_action(states[i])actions.append(action)next_states,rewards,dones=self.env.step(actions)foriinrange(self.num_agents):self.agents[i].remember(states[i],actions[i],rewards[i],next_states[i],dones[i])self.agents[i].replay()total_rewards[i]+=rewards[i]states=next_states done=all(dones)orany(dones)# 任意智能体完成或全部完成ifepisode%100==0:avg_reward=np.mean(total_rewards)print(f"Episode{episode}, Average Team Reward:{avg_reward:.2f}")# 训练多智能体系统multi_env=MultiAgentEnvironment(num_agents=2,grid_size=5)trainer=MultiAgentTrainer(multi_env,2)trainer.train(episodes=500)总结
本文全面介绍了强化学习从基础到高级的内容,包括:
- 基础概念:强化学习的核心要素和框架
- 价值迭代方法:Q-Learning、SARSA等经典算法
- 深度强化学习:DQN及其改进算法
- 策略梯度方法:REINFORCE、Actor-Critic
- 实际应用:CartPole环境的实现
- 高级主题:多智能体强化学习
强化学习是一个充满活力的研究领域,正在不断发展和完善。从简单的表格方法到复杂的深度网络,从单智能体到多智能体系统,强化学习正在解决越来越复杂的问题。
实践建议
- 从简单的环境开始,逐步增加复杂度
- 理解算法背后的数学原理
- 实验不同的超参数组合
- 使用适当的可视化工具理解学习过程
- 参与OpenAI Gym、DeepMind Control等平台的挑战