news 2026/1/11 8:14:45

强化学习_从Q-Learning到深度强化学习

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
强化学习_从Q-Learning到深度强化学习

标题

    • 引言
    • 强化学习基础
      • 强化学习的核心要素
    • Q-Learning算法
      • Q-Learning原理
      • SARSA算法
    • 深度Q网络(Deep Q Network, DQN)
      • DQN架构
      • 改进的DQN算法
        • Double DQN
    • 策略梯度方法
      • REINFORCE算法
    • Actor-Critic方法
    • 比较不同算法
    • 实战项目:CartPole环境
    • 高级主题:多智能体强化学习
    • 总结
    • 实践建议

引言

强化学习(Reinforcement Learning, RL)是机器学习的一个重要分支,专注于智能体(Agent)如何通过与环境(Environment)的交互来学习最优策略。从游戏AI到机器人控制,从推荐系统到自动驾驶,强化学习正在改变我们与智能系统的交互方式。本文将深入探讨强化学习的核心概念,从传统的Q-Learning算法到现代的深度强化学习方法。

强化学习基础

强化学习的核心要素

强化学习系统由以下几个关键组件组成:

  1. 智能体(Agent):学习并做出决策的实体
  2. 环境(Environment):智能体交互的外部世界
  3. 状态(State):环境的当前情况
  4. 动作(Action):智能体可以执行的操作
  5. 奖励(Reward):执行动作后获得的反馈
  6. 策略(Policy):智能体的决策规则
importnumpyasnpimportmatplotlib.pyplotaspltfromcollectionsimportdefaultdict,dequeimportrandomclassEnvironment:"""强化学习环境基类"""def__init__(self):self.state=Noneself.action_space=Noneself.observation_space=Nonedefreset(self):"""重置环境到初始状态"""raiseNotImplementedErrordefstep(self,action):"""执行动作,返回(next_state, reward, done, info)"""raiseNotImplementedErrordefrender(self):"""可视化环境"""raiseNotImplementedError# 示例:简单的网格世界环境classGridWorld(Environment):def__init__(self,width=5,height=5):super().__init__()self.width=width self.height=height self.action_space=4# 上、下、左、右self.observation_space=width*height# 定义特殊位置self.start_pos=(0,0)self.goal_pos=(width-1,height-1)self.obstacles=[(2,2),(3,2)]self.reset()defreset(self):self.agent_pos=self.start_pos self.steps=0returnself._get_state()def_get_state(self):"""将位置转换为状态编号"""returnself.agent_pos[0]*self.height+self.agent_pos[1]defstep(self,action):self.steps+=1# 保存旧位置old_pos=self.agent_pos# 执行动作ifaction==0:# 上new_pos=(self.agent_pos[0],max(0,self.agent_pos[1]-1))elifaction==1:# 下new_pos=(self.agent_pos[0],min(self.height-1,self.agent_pos[1]+1))elifaction==2:# 左new_pos=(max(0,self.agent_pos[0]-1),self.agent_pos[1])elifaction==3:# 右new_pos=(min(self.width-1,self.agent_pos[0]+1),self.agent_pos[1])else:new_pos=self.agent_pos# 检查是否碰到障碍物ifnew_posnotinself.obstacles:self.agent_pos=new_pos# 计算奖励ifself.agent_pos==self.goal_pos:reward=10# 到达目标done=Trueelifself.agent_pos==old_pos:# 撞墙或障碍物reward=-1done=Falseelse:reward=-0.1# 每步的小惩罚,鼓励尽快到达目标done=False# 限制最大步数ifself.steps>=100:done=Truereturnself._get_state(),reward,done,{}defrender(self):"""打印网格世界"""grid=np.full((self.height,self.width),'.',dtype=str)grid[self.goal_pos[1],self.goal_pos[0]]='G'forobsinself.obstacles:grid[obs[1],obs[0]]='#'grid[self.agent_pos[1],self.agent_pos[0]]='A'print("\n".join(" ".join(row)forrowingrid))print()# 创建环境并测试env=GridWorld(5,5)state=env.reset()print(f"初始状态:{state}")env.render()foriinrange(10):action=random.randint(0,3)state,reward,done,info=env.step(action)print(f"步骤{i+1}: 动作={action}, 奖励={reward}, 状态={state}, 完成={done}")env.render()ifdone:print("任务完成!")break

Q-Learning算法

Q-Learning是最经典的强化学习算法之一,通过学习动作价值函数Q来找到最优策略。

Q-Learning原理

Q-Learning使用贝尔曼方程来更新Q值:
Q ( s , a ) = Q ( s , a ) + α [ r + γ max ⁡ a ′ Q ( s ′ , a ′ ) − Q ( s , a ) ] Q(s,a) = Q(s,a) + \alpha[r + \gamma \max_{a'} Q(s',a') - Q(s,a)]Q(s,a)=Q(s,a)+α[r+γamaxQ(s,a)Q(s,a)]

其中:

  • α \alphaα是学习率
  • γ \gammaγ是折扣因子
  • r rr是即时奖励
  • s ′ s's是下一个状态
classQLearningAgent:def__init__(self,state_space,action_space,learning_rate=0.1,discount_factor=0.95,epsilon=0.1):self.state_space=state_space self.action_space=action_space self.learning_rate=learning_rate self.discount_factor=discount_factor self.epsilon=epsilon# Q表:状态-动作值函数self.q_table=np.zeros((state_space,action_space))# 统计信息self.episode_rewards=[]defchoose_action(self,state,training=True):"""使用epsilon-贪心策略选择动作"""iftrainingandrandom.random()<self.epsilon:# 探索:随机选择动作returnrandom.randint(0,self.action_space-1)else:# 利用:选择Q值最高的动作returnnp.argmax(self.q_table[state])defupdate(self,state,action,reward,next_state):"""更新Q表"""# 获取当前Q值current_q=self.q_table[state,action]# 计算最大Q值(用于下一状态)max_next_q=np.max(self.q_table[next_state])# 贝尔曼方程更新new_q=current_q+self.learning_rate*(reward+self.discount_factor*max_next_q-current_q)self.q_table[state,action]=new_qdefdecay_epsilon(self,decay_rate=0.995,min_epsilon=0.01):"""衰减探索率"""self.epsilon=max(min_epsilon,self.epsilon*decay_rate)deftrain(self,env,episodes=1000,max_steps=100):"""训练智能体"""forepisodeinrange(episodes):state=env.reset()total_reward=0forstepinrange(max_steps):# 选择动作action=self.choose_action(state)# 执行动作next_state,reward,done,_=env.step(action)# 更新Q表self.update(state,action,reward,next_state)# 更新状态state=next_state total_reward+=rewardifdone:break# 记录总奖励self.episode_rewards.append(total_reward)# 衰减epsilonself.decay_epsilon()# 打印进度ifepisode%100==0:avg_reward=np.mean(self.episode_rewards[-100:])print(f"Episode{episode}, Average Reward (last 100):{avg_reward:.2f}, Epsilon:{self.epsilon:.3f}")deftest(self,env,episodes=10):"""测试训练好的智能体"""total_rewards=[]forepisodeinrange(episodes):state=env.reset()total_reward=0done=Falsesteps=0whilenotdoneandsteps<100:# 选择动作(不探索)action=self.choose_action(state,training=False)state,reward,done,_=env.step(action)total_reward+=reward steps+=1total_rewards.append(total_reward)print(f"Test Episode{episode+1}: Reward ={total_reward}, Steps ={steps}")print(f"Average Test Reward:{np.mean(total_rewards):.2f}")# 训练Q-Learning智能体env=GridWorld(5,5)agent=QLearningAgent(state_space=env.observation_space,action_space=env.action_space,learning_rate=0.1,discount_factor=0.95,epsilon=0.1)print("开始训练Q-Learning智能体...")agent.train(env,episodes=1000)# 测试训练结果print("\n测试训练好的智能体:")agent.test(env,episodes=5)# 可视化学习曲线plt.figure(figsize=(12,4))plt.subplot(1,2,1)plt.plot(agent.episode_rewards)plt.title("Episode Rewards")plt.xlabel("Episode")plt.ylabel("Total Reward")plt.subplot(1,2,2)# 计算移动平均window=100moving_avg=[np.mean(agent.episode_rewards[i:i+window])foriinrange(len(agent.episode_rewards)-window)]plt.plot(moving_avg)plt.title(f"Moving Average Reward (window={window})")plt.xlabel("Episode")plt.ylabel("Average Reward")plt.show()

SARSA算法

SARSA是另一种价值迭代算法,与Q-Learning不同,它使用实际执行的下一个动作来更新Q值。

classSARSAAgent:def__init__(self,state_space,action_space,learning_rate=0.1,discount_factor=0.95,epsilon=0.1):self.state_space=state_space self.action_space=action_space self.learning_rate=learning_rate self.discount_factor=discount_factor self.epsilon=epsilon self.q_table=np.zeros((state_space,action_space))self.episode_rewards=[]defchoose_action(self,state,training=True):"""选择动作"""iftrainingandrandom.random()<self.epsilon:returnrandom.randint(0,self.action_space-1)else:returnnp.argmax(self.q_table[state])defupdate(self,state,action,reward,next_state,next_action):"""SARSA更新规则"""current_q=self.q_table[state,action]next_q=self.q_table[next_state,next_action]new_q=current_q+self.learning_rate*(reward+self.discount_factor*next_q-current_q)self.q_table[state,action]=new_qdeftrain(self,env,episodes=1000,max_steps=100):"""训练SARSA智能体"""forepisodeinrange(episodes):state=env.reset()action=self.choose_action(state)total_reward=0forstepinrange(max_steps):# 执行动作next_state,reward,done,_=env.step(action)# 选择下一个动作next_action=self.choose_action(next_state)# 更新Q表self.update(state,action,reward,next_state,next_action)# 更新状态和动作state=next_state action=next_action total_reward+=rewardifdone:breakself.episode_rewards.append(total_reward)self.epsilon=max(0.01,self.epsilon*0.995)ifepisode%100==0:avg_reward=np.mean(self.episode_rewards[-100:])print(f"SARSA Episode{episode}, Average Reward:{avg_reward:.2f}")# 比较Q-Learning和SARSAprint("\n比较Q-Learning和SARSA:")# 训练SARSA智能体sarsa_agent=SARSAAgent(state_space=env.observation_space,action_space=env.action_space,learning_rate=0.1,discount_factor=0.95,epsilon=0.1)print("训练SARSA智能体...")sarsa_agent.train(env,episodes=1000)# 绘制比较图plt.figure(figsize=(10,5))window=100# Q-Learning移动平均q_avg=[np.mean(agent.episode_rewards[i:i+window])foriinrange(len(agent.episode_rewards)-window)]plt.plot(q_avg,label='Q-Learning')# SARSA移动平均sarsa_avg=[np.mean(sarsa_agent.episode_rewards[i:i+window])foriinrange(len(sarsa_agent.episode_rewards)-window)]plt.plot(sarsa_avg,label='SARSA')plt.title("Q-Learning vs SARSA Performance")plt.xlabel("Episode")plt.ylabel(f"Average Reward (window={window})")plt.legend()plt.show()

深度Q网络(Deep Q Network, DQN)

当状态空间很大时,传统的Q表方法变得不可行。深度Q网络使用神经网络来近似Q函数。

DQN架构

importtorchimporttorch.nnasnnimporttorch.optimasoptimimporttorch.nn.functionalasFclassDQN(nn.Module):"""深度Q网络"""def__init__(self,state_size,action_size,hidden_size=64):super(DQN,self).__init__()self.fc1=nn.Linear(state_size,hidden_size)self.fc2=nn.Linear(hidden_size,hidden_size)self.fc3=nn.Linear(hidden_size,action_size)defforward(self,x):x=F.relu(self.fc1(x))x=F.relu(self.fc2(x))returnself.fc3(x)classReplayBuffer:"""经验回放缓冲区"""def__init__(self,capacity):self.buffer=deque(maxlen=capacity)defpush(self,state,action,reward,next_state,done):self.buffer.append((state,action,reward,next_state,done))defsample(self,batch_size):returnrandom.sample(self.buffer,batch_size)def__len__(self):returnlen(self.buffer)classDQNAgent:def__init__(self,state_size,action_size,learning_rate=0.001):self.state_size=state_size self.action_size=action_size self.device=torch.device("cuda"iftorch.cuda.is_available()else"cpu")# 主网络和目标网络self.q_network=DQN(state_size,action_size).to(self.device)self.target_network=DQN(state_size,action_size).to(self.device)# 复制主网络的权重到目标网络self.update_target_network()# 优化器self.optimizer=optim.Adam(self.q_network.parameters(),lr=learning_rate)# 经验回放self.memory=ReplayBuffer(10000)# 超参数self.epsilon=1.0self.epsilon_min=0.01self.epsilon_decay=0.995self.gamma=0.95self.batch_size=64self.target_update_freq=100self.episode_rewards=[]defupdate_target_network(self):"""更新目标网络"""self.target_network.load_state_dict(self.q_network.state_dict())defone_hot_encode(self,state):"""将状态编号转换为one-hot编码"""state_vector=np.zeros(self.state_size)state_vector[state]=1returnstate_vectordefchoose_action(self,state,training=True):"""选择动作"""iftrainingandrandom.random()<self.epsilon:returnrandom.randint(0,self.action_size-1)else:state_vector=self.one_hot_encode(state)state_tensor=torch.FloatTensor(state_vector).unsqueeze(0).to(self.device)q_values=self.q_network(state_tensor)returnq_values.argmax().item()defremember(self,state,action,reward,next_state,done):"""存储经验"""self.memory.push(state,action,reward,next_state,done)defreplay(self):"""经验回放训练"""iflen(self.memory)<self.batch_size:return# 采样批次batch=self.memory.sample(self.batch_size)states,actions,rewards,next_states,dones=zip(*batch)# 转换为tensorstates=torch.FloatTensor([self.one_hot_encode(s)forsinstates]).to(self.device)actions=torch.LongTensor(actions).to(self.device)rewards=torch.FloatTensor(rewards).to(self.device)next_states=torch.FloatTensor([self.one_hot_encode(s)forsinnext_states]).to(self.device)dones=torch.BoolTensor(dones).to(self.device)# 计算当前Q值current_q_values=self.q_network(states).gather(1,actions.unsqueeze(1))# 计算目标Q值next_q_values=self.target_network(next_states).max(1)[0].detach()target_q_values=rewards+(self.gamma*next_q_values*~dones)# 计算损失loss=F.mse_loss(current_q_values.squeeze(),target_q_values)# 反向传播self.optimizer.zero_grad()loss.backward()self.optimizer.step()deftrain(self,env,episodes=1000):"""训练DQN智能体"""forepisodeinrange(episodes):state=env.reset()total_reward=0steps=0whilesteps<200:# 限制每集最大步数# 选择动作action=self.choose_action(state)# 执行动作next_state,reward,done,_=env.step(action)# 存储经验self.remember(state,action,reward,next_state,done)# 更新状态state=next_state total_reward+=reward steps+=1# 经验回放self.replay()ifdone:break# 更新目标网络ifepisode%self.target_update_freq==0:self.update_target_network()# 衰减epsilonself.epsilon=max(self.epsilon_min,self.epsilon*self.epsilon_decay)# 记录奖励self.episode_rewards.append(total_reward)ifepisode%100==0:avg_reward=np.mean(self.episode_rewards[-100:])print(f"DQN Episode{episode}, Average Reward:{avg_reward:.2f}, Epsilon:{self.epsilon:.3f}")# 训练DQN智能体dqn_agent=DQNAgent(state_size=env.observation_space,action_space=env.action_space,learning_rate=0.001)print("\n开始训练DQN智能体...")dqn_agent.train(env,episodes=1000)# 可视化DQN性能plt.figure(figsize=(10,5))window=100dqn_avg=[np.mean(dqn_agent.episode_rewards[i:i+window])foriinrange(len(dqn_agent.episode_rewards)-window)]plt.plot(dqn_avg)plt.title("DQN Learning Curve")plt.xlabel("Episode")plt.ylabel(f"Average Reward (window={window})")plt.show()

改进的DQN算法

Double DQN
classDoubleDQNAgent(DQNAgent):"""Double DQN解决过估计问题"""def__init__(self,state_size,action_size,learning_rate=0.001):super().__init__(state_size,action_size,learning_rate)defreplay(self):"""Double DQN经验回放"""iflen(self.memory)<self.batch_size:returnbatch=self.memory.sample(self.batch_size)states,actions,rewards,next_states,dones=zip(*batch)states=torch.FloatTensor([self.one_hot_encode(s)forsinstates]).to(self.device)actions=torch.LongTensor(actions).to(self.device)rewards=torch.FloatTensor(rewards).to(self.device)next_states=torch.FloatTensor([self.one_hot_encode(s)forsinnext_states]).to(self.device)dones=torch.BoolTensor(dones).to(self.device)# 当前Q值current_q_values=self.q_network(states).gather(1,actions.unsqueeze(1))# Double DQN计算目标Q值# 使用主网络选择动作,使用目标网络评估next_actions=self.q_network(next_states).argmax(1)[0]next_q_values=self.target_network(next_states).gather(1,next_actions.unsqueeze(1)).squeeze()target_q_values=rewards+(self.gamma*next_q_values*~dones)loss=F.mse_loss(current_q_values.squeeze(),target_q_values)self.optimizer.zero_grad()loss.backward()self.optimizer.step()

策略梯度方法

REINFORCE算法

REINFORCE是最简单的策略梯度算法,直接优化策略函数。

classPolicyNetwork(nn.Module):"""策略网络"""def__init__(self,state_size,action_size,hidden_size=128):super(PolicyNetwork,self).__init__()self.fc1=nn.Linear(state_size,hidden_size)self.fc2=nn.Linear(hidden_size,hidden_size)self.fc3=nn.Linear(hidden_size,action_size)defforward(self,x):x=F.relu(self.fc1(x))x=F.relu(self.fc2(x))returnF.softmax(self.fc3(x),dim=-1)classREINFORCEAgent:def__init__(self,state_size,action_size,learning_rate=0.01,gamma=0.99):self.state_size=state_size self.action_size=action_size self.device=torch.device("cuda"iftorch.cuda.is_available()else"cpu")# 策略网络self.policy_net=PolicyNetwork(state_size,action_size).to(self.device)self.optimizer=optim.Adam(self.policy_net.parameters(),lr=learning_rate)self.gamma=gamma self.episode_rewards=[]self.saved_log_probs=[]self.rewards=[]defone_hot_encode(self,state):"""one-hot编码"""state_vector=np.zeros(self.state_size)state_vector[state]=1returnstate_vectordefchoose_action(self,state):"""根据策略选择动作"""state_vector=self.one_hot_encode(state)state_tensor=torch.FloatTensor(state_vector).to(self.device)probs=self.policy_net(state_tensor)# 从概率分布中采样动作action_dist=torch.distributions.Categorical(probs)action=action_dist.sample()# 保存对数概率用于训练self.saved_log_probs.append(action_dist.log_prob(action))returnaction.item()defupdate_policy(self):"""更新策略"""R=0policy_loss=[]returns=[]# 计算折扣回报forrinself.rewards[::-1]:R=r+self.gamma*R returns.insert(0,R)# 标准化回报returns=torch.tensor(returns)returns=(returns-returns.mean())/(returns.std()+1e-8)# 计算策略损失forlog_prob,Rinzip(self.saved_log_probs,returns):policy_loss.append(-log_prob*R)# 更新策略self.optimizer.zero_grad()policy_loss=torch.stack(policy_loss).sum()policy_loss.backward()self.optimizer.step()# 清空缓存self.saved_log_probs=[]self.rewards=[]deftrain(self,env,episodes=1000):"""训练REINFORCE智能体"""forepisodeinrange(episodes):state=env.reset()total_reward=0done=Falsewhilenotdone:# 选择动作action=self.choose_action(state)# 执行动作state,reward,done,_=env.step(action)# 记录奖励self.rewards.append(reward)total_reward+=reward# 更新策略self.update_policy()# 记录总奖励self.episode_rewards.append(total_reward)ifepisode%100==0:avg_reward=np.mean(self.episode_rewards[-100:])print(f"REINFORCE Episode{episode}, Average Reward:{avg_reward:.2f}")# 训练REINFORCE智能体reinforce_agent=REINFORCEAgent(state_size=env.observation_space,action_size=env.action_space,learning_rate=0.01,gamma=0.99)print("\n开始训练REINFORCE智能体...")reinforce_agent.train(env,episodes=1000)

Actor-Critic方法

Actor-Critic结合了价值方法和策略方法的优点。

classActorCritic(nn.Module):"""Actor-Critic网络"""def__init__(self,state_size,action_size,hidden_size=128):super(ActorCritic,self).__init__()self.shared=nn.Linear(state_size,hidden_size)# Actor头(策略)self.actor=nn.Linear(hidden_size,action_size)# Critic头(价值)self.critic=nn.Linear(hidden_size,1)defforward(self,x):x=F.relu(self.shared(x))policy=F.softmax(self.actor(x),dim=-1)value=self.critic(x)returnpolicy,valueclassActorCriticAgent:def__init__(self,state_size,action_size,learning_rate=0.01,gamma=0.99):self.state_size=state_size self.action_size=action_size self.device=torch.device("cuda"iftorch.cuda.is_available()else"cpu")# Actor-Critic网络self.ac_net=ActorCritic(state_size,action_size).to(self.device)self.optimizer=optim.Adam(self.ac_net.parameters(),lr=learning_rate)self.gamma=gamma self.episode_rewards=[]self.log_probs=[]self.values=[]self.rewards=[]defone_hot_encode(self,state):state_vector=np.zeros(self.state_size)state_vector[state]=1returnstate_vectordefchoose_action(self,state):state_vector=self.one_hot_encode(state)state_tensor=torch.FloatTensor(state_vector).to(self.device)policy,value=self.ac_net(state_tensor)# 采样动作action_dist=torch.distributions.Categorical(policy)action=action_dist.sample()# 保存用于训练self.log_probs.append(action_dist.log_prob(action))self.values.append(value)returnaction.item()defupdate(self):"""更新Actor-Critic网络"""# 计算回报returns=[]R=0forrinself.rewards[::-1]:R=r+self.gamma*R returns.insert(0,R)returns=torch.tensor(returns).float()# 计算优势values=torch.cat(self.values).squeeze()advantages=returns-values# 计算Actor损失(策略梯度)actor_loss=[]forlog_prob,advantageinzip(self.log_probs,advantages):actor_loss.append(-log_prob*advantage)# 计算Critic损失(价值函数)critic_loss=F.mse_loss(values,returns)# 总损失loss=torch.stack(actor_loss).sum()+critic_loss# 更新网络self.optimizer.zero_grad()loss.backward()self.optimizer.step()# 清空缓存self.log_probs=[]self.values=[]self.rewards=[]deftrain(self,env,episodes=1000):"""训练Actor-Critic智能体"""forepisodeinrange(episodes):state=env.reset()total_reward=0done=Falsewhilenotdone:# 选择动作action=self.choose_action(state)# 执行动作state,reward,done,_=env.step(action)# 记录self.rewards.append(reward)total_reward+=reward# 更新网络self.update()# 记录奖励self.episode_rewards.append(total_reward)ifepisode%100==0:avg_reward=np.mean(self.episode_rewards[-100:])print(f"Actor-Critic Episode{episode}, Average Reward:{avg_reward:.2f}")# 训练Actor-Critic智能体ac_agent=ActorCriticAgent(state_size=env.observation_space,action_size=env.action_space,learning_rate=0.01,gamma=0.99)print("\n开始训练Actor-Critic智能体...")ac_agent.train(env,episodes=1000)

比较不同算法

# 比较所有算法的性能plt.figure(figsize=(15,8))# 计算所有算法的移动平均window=50# Q-Learningiflen(agent.episode_rewards)>window:q_avg=[np.mean(agent.episode_rewards[i:i+window])foriinrange(len(agent.episode_rewards)-window)]plt.plot(q_avg,label='Q-Learning',alpha=0.7)# SARSAiflen(sarsa_agent.episode_rewards)>window:sarsa_avg=[np.mean(sarsa_agent.episode_rewards[i:i+window])foriinrange(len(sarsa_agent.episode_rewards)-window)]plt.plot(sarsa_avg,label='SARSA',alpha=0.7)# DQNiflen(dqn_agent.episode_rewards)>window:dqn_avg=[np.mean(dqn_agent.episode_rewards[i:i+window])foriinrange(len(dqn_agent.episode_rewards)-window)]plt.plot(dqn_avg,label='DQN',alpha=0.7)# REINFORCEiflen(reinforce_agent.episode_rewards)>window:reinforce_avg=[np.mean(reinforce_agent.episode_rewards[i:i+window])foriinrange(len(reinforce_agent.episode_rewards)-window)]plt.plot(reinforce_avg,label='REINFORCE',alpha=0.7)# Actor-Criticiflen(ac_agent.episode_rewards)>window:ac_avg=[np.mean(ac_agent.episode_rewards[i:i+window])foriinrange(len(ac_agent.episode_rewards)-window)]plt.plot(ac_avg,label='Actor-Critic',alpha=0.7)plt.title("Comparison of Different RL Algorithms")plt.xlabel("Episode")plt.ylabel(f"Average Reward (window={window})")plt.legend()plt.grid(True,alpha=0.3)plt.show()

实战项目:CartPole环境

让我们使用经典的CartPole环境来测试我们的算法。

importgymclassCartPoleAgent:def__init__(self,algorithm='dqn'):self.env=gym.make('CartPole-v1')self.state_size=self.env.observation_space.shape[0]self.action_size=self.env.action_space.n self.algorithm=algorithm# 根据算法创建智能体ifalgorithm=='dqn':self.agent=DQNAgent(self.state_size,self.action_size)self.agent.epsilon=0.1# 降低探索率elifalgorithm=='reinforce':self.agent=REINFORCEAgent(self.state_size,self.action_size)elifalgorithm=='actor-critic':self.agent=ActorCriticAgent(self.state_size,self.action_size)deftrain(self,episodes=500):"""训练智能体"""print(f"Training{self.algorithm}agent on CartPole...")forepisodeinrange(episodes):state=self.env.reset()total_reward=0done=Falsewhilenotdone:ifself.algorithm=='dqn':action=self.agent.choose_action(state)next_state,reward,done,_=self.env.step(action)self.agent.remember(state,action,reward,next_state,done)self.agent.replay()else:action=self.agent.choose_action(state)next_state,reward,done,_=self.env.step(action)self.agent.rewards.append(reward)state=next_state total_reward+=reward# 对于策略梯度方法,更新策略ifself.algorithm!='dqn':ifself.algorithm=='reinforce':self.agent.update_policy()else:self.agent.update()self.agent.episode_rewards.append(total_reward)ifepisode%50==0:avg_reward=np.mean(self.agent.episode_rewards[-50:])print(f"Episode{episode}, Average Reward:{avg_reward:.2f}")# 如果平均奖励超过195,认为任务解决ifavg_reward>=195:print(f"CartPole solved in{episode}episodes!")breakdeftest(self,episodes=10):"""测试训练好的智能体"""print(f"\nTesting{self.algorithm}agent...")total_rewards=[]forepisodeinrange(episodes):state=self.env.reset()total_reward=0done=Falsewhilenotdone:ifself.algorithm=='dqn':action=self.agent.choose_action(state,training=False)else:action=self.agent.choose_action(state)state,reward,done,_=self.env.step(action)total_reward+=reward total_rewards.append(total_reward)print(f"Test Episode{episode+1}: Reward ={total_reward}")print(f"Average Test Reward:{np.mean(total_rewards):.2f}")# 测试不同算法algorithms=['dqn','reinforce','actor-critic']results={}foralgoinalgorithms:cartpole=CartPoleAgent(algo)cartpole.train(episodes=500)cartpole.test()results[algo]=cartpole.agent.episode_rewards# 可视化结果plt.figure(figsize=(12,5))foralgo,rewardsinresults.items():window=50iflen(rewards)>window:avg=[np.mean(rewards[i:i+window])foriinrange(len(rewards)-window)]plt.plot(avg,label=algo)plt.title("CartPole: Algorithm Comparison")plt.xlabel("Episode")plt.ylabel(f"Average Reward (window={50})")plt.axhline(y=195,color='r',linestyle='--',label='Solved Threshold')plt.legend()plt.show()

高级主题:多智能体强化学习

classMultiAgentEnvironment:"""多智能体环境示例"""def__init__(self,num_agents=2,grid_size=5):self.num_agents=num_agents self.grid_size=grid_size self.agents_pos=[(0,i)foriinrange(num_agents)]self.goal_pos=(grid_size-1,grid_size-1)self.obstacles=[(2,2),(3,2)]defreset(self):self.agents_pos=[(0,i)foriinrange(self.num_agents)]return[self._get_agent_state(i)foriinrange(self.num_agents)]def_get_agent_state(self,agent_id):"""获取单个智能体的状态"""x,y=self.agents_pos[agent_id]returnx*self.grid_size+ydefstep(self,actions):"""执行所有智能体的动作"""rewards=[]dones=[]next_states=[]foriinrange(self.num_agents):old_pos=self.agents_pos[i]action=actions[i]# 执行动作ifaction==0:# 上new_pos=(self.agents_pos[i][0],max(0,self.agents_pos[i][1]-1))elifaction==1:# 下new_pos=(self.agents_pos[i][0],min(self.grid_size-1,self.agents_pos[i][1]+1))elifaction==2:# 左new_pos=(max(0,self.agents_pos[i][0]-1),self.agents_pos[i][1])elifaction==3:# 右new_pos=(min(self.grid_size-1,self.agents_pos[i][0]+1),self.agents_pos[i][1])else:new_pos=self.agents_pos[i]# 检查碰撞ifnew_posnotinself.obstaclesandnew_posnotinself.agents_pos[:i]+self.agents_pos[i+1:]:self.agents_pos[i]=new_pos# 计算奖励ifself.agents_pos[i]==self.goal_pos:reward=10done=Trueelifself.agents_pos[i]==old_pos:reward=-1done=Falseelse:# 团队奖励:靠近其他智能体min_dist=min([abs(self.agents_pos[i][0]-ap[0])+abs(self.agents_pos[i][1]-ap[1])forj,apinenumerate(self.agents_pos)ifi!=j])reward=0.1*(4-min_dist)# 越近奖励越高done=Falserewards.append(reward)dones.append(done)next_states.append(self._get_agent_state(i))returnnext_states,rewards,dones# 多智能体训练classMultiAgentTrainer:def__init__(self,env,num_agents):self.env=env self.num_agents=num_agents self.agents=[]# 为每个智能体创建独立的DQNfor_inrange(num_agents):agent=DQNAgent(env.grid_size*env.grid_size,4)agent.epsilon=0.1self.agents.append(agent)deftrain(self,episodes=1000):"""训练多智能体系统"""forepisodeinrange(episodes):states=self.env.reset()total_rewards=[0]*self.num_agents done=Falsewhilenotdone:actions=[]foriinrange(self.num_agents):action=self.agents[i].choose_action(states[i])actions.append(action)next_states,rewards,dones=self.env.step(actions)foriinrange(self.num_agents):self.agents[i].remember(states[i],actions[i],rewards[i],next_states[i],dones[i])self.agents[i].replay()total_rewards[i]+=rewards[i]states=next_states done=all(dones)orany(dones)# 任意智能体完成或全部完成ifepisode%100==0:avg_reward=np.mean(total_rewards)print(f"Episode{episode}, Average Team Reward:{avg_reward:.2f}")# 训练多智能体系统multi_env=MultiAgentEnvironment(num_agents=2,grid_size=5)trainer=MultiAgentTrainer(multi_env,2)trainer.train(episodes=500)

总结

本文全面介绍了强化学习从基础到高级的内容,包括:

  1. 基础概念:强化学习的核心要素和框架
  2. 价值迭代方法:Q-Learning、SARSA等经典算法
  3. 深度强化学习:DQN及其改进算法
  4. 策略梯度方法:REINFORCE、Actor-Critic
  5. 实际应用:CartPole环境的实现
  6. 高级主题:多智能体强化学习

强化学习是一个充满活力的研究领域,正在不断发展和完善。从简单的表格方法到复杂的深度网络,从单智能体到多智能体系统,强化学习正在解决越来越复杂的问题。

实践建议

  1. 从简单的环境开始,逐步增加复杂度
  2. 理解算法背后的数学原理
  3. 实验不同的超参数组合
  4. 使用适当的可视化工具理解学习过程
  5. 参与OpenAI Gym、DeepMind Control等平台的挑战
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/1/6 2:32:01

HLS技术的局限性说明

一、HLS技术的局限性 1.HLS编译器说明 关于HLS编译器是一个静态工具&#xff0c;那么对理解动态特性没有帮助&#xff0c;可以认为是一个翻译工具。 rtl级别的是无法在线调试和debug的&#xff0c;只能在c层面进行debug&#xff0c;但是c层面和rtl层面的结果 是两个概念。 2.HL…

作者头像 李华
网站建设 2026/1/4 14:26:00

水文监测站:水资源管理的“千里眼”与“顺风耳”

水文监测站是对江、河、湖泊、水库、渠道和地下水等水文要素进行实时监测的专业站点&#xff0c;在水资源管理、防洪减灾、水利工程建设与运行、生态环境保护等诸多方面都发挥着极为重要的作用。一、定义与功能水文监测站又称雷达水位监测站、自动水文监测设备&#xff0c;是一…

作者头像 李华
网站建设 2026/1/8 23:33:36

白银波动幅度大于黄金的原因:市场规模与属性差异深度解析

想象一下&#xff0c;你正站在2025年的码头上&#xff0c;面前停着两艘船。一艘是排水量十万吨的超级豪华邮轮&#xff0c;上面写着“黄金号”&#xff1b;另一艘是动力强劲的快艇&#xff0c;船身印着“白银号”。这时候&#xff0c;海上突然刮起了一阵名为“美联储降息”的巨…

作者头像 李华
网站建设 2026/1/9 19:04:24

【2026版】Spring Boot面试题

这篇文章涵盖了Spring Boot的核心概念和高级主题&#xff0c;包括自动配置、多数据源、Actuator、缓存、AOP、异步编程、事务管理、安全性、消息队列和微服务架构。这些问题可以帮助您评估候选人的技能和经验&#xff0c;同时也是提高自己Spring Boot技能的好方法。 1. Spring …

作者头像 李华
网站建设 2026/1/7 21:55:28

办公小程序开发----提高工作效率

文章目录 前言一、批量html文件转化为pdf1. 问题描述2.代码3.程序下载链接 总结 前言 办公过程中&#xff0c;总是会遇到一些大量重复做的事情&#xff0c;通过生成一些小程序&#xff0c;提高办公效率。 一、批量html文件转化为pdf 1. 问题描述 我需要将若干的html文件转换…

作者头像 李华
网站建设 2026/1/9 22:58:44

Jmeter 命令行压测生成HTML测试报告

&#x1f345; 点击文末小卡片&#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快通常Jmeter的 GUI 模式仅用于调试&#xff0c;在实际的压测项目中&#xff0c;为了让压测机有更好的性能&#xff0c;多用 Jmeter 命令行来进行压测。同时&#xff…

作者头像 李华