功能说明 本代码实现了一个基于强化学习(RL)和长短期记忆网络(LSTM)的量化交易策略。该策略通过LSTM模型对历史价格数据进行特征提取,再利用强化学习算法(如DQN或PPO)训练智能体,使其能够根据市场状态做出买卖决策。核心在于设计合理的奖励函数,将交易信号与市场反馈有效关联,从而优化策略的收益风险比。该策略适用于股票、期货等金融时间序列数据的自动化交易,但需注意其对历史数据的依赖性和潜在的过拟合风险。
作用与风险分析 作用 自适应学习 :LSTM能捕捉价格序列中的非线性关系,强化学习使策略具备动态调整能力风险控制 :通过奖励函数设计可自然融入止损止盈逻辑多维度决策 :可整合量价指标、波动率等多维特征作为输入策略迭代 :支持在线学习机制,持续适应市场变化风险 数据依赖性 :策略性能高度依赖历史数据的质量和完整性参数敏感性 :奖励函数权重、LSTM超参数等设置不当可能导致策略失效黑箱特性 :深度学习模型缺乏可解释性,难以进行归因分析市场突变 :极端行情下可能产生异常交易行为计算成本 :需要GPU加速训练,实时交易存在延迟风险系统架构设计 1. 数据预处理模块 import numpyas npimport pandasas pdfrom sklearn. preprocessingimport MinMaxScalerclass DataPreprocessor : def __init__ ( self, lookback_window= 60 ) : self. lookback_window= lookback_window self. scaler= MinMaxScaler( feature_range= ( - 1 , 1 ) ) def prepare_data ( self, df, features= [ 'close' , 'volume' ] ) : """处理原始数据并生成标准化特征矩阵""" # 计算技术指标 df= self. _add_technical_indicators( df) # 选择目标特征 data= df[ features] . values# 数据标准化 scaled_data= self. scaler. fit_transform( data) # 创建滑动窗口样本 X, y= [ ] , [ ] for iin range ( len ( scaled_data) - self. lookback_window) : X. append( scaled_data[ i: i+ self. lookback_window] ) y. append( scaled_data[ i+ self. lookback_window, 0 ] ) # 预测收盘价 return np. array( X) , np. array( y) def _add_technical_indicators ( self, df) : """添加常用技术指标""" # RSI delta= df[ 'close' ] . diff( ) gain= ( delta. where( delta> 0 , 0 ) ) . rolling( window= 14 ) . mean( ) loss= ( - delta. where( delta< 0 , 0 ) ) . rolling( window= 14 ) . mean( ) df[ 'rsi' ] = 100 - ( 100 / ( 1 + ( gain/ loss) ) ) # MACD exp1= df[ 'close' ] . ewm( span= 12 , adjust= False ) . mean( ) exp2= df[ 'close' ] . ewm( span= 26 , adjust= False ) . mean( ) df[ 'macd' ] = exp1- exp2 df[ 'signal_line' ] = df[ 'macd' ] . ewm( span= 9 , adjust= False ) . mean( ) # 成交量加权均价 df[ 'vwap' ] = ( df[ 'close' ] * df[ 'volume' ] ) . cumsum( ) / df[ 'volume' ] . cumsum( ) return df. dropna( ) 2. LSTM特征编码器 import tensorflowas tffrom tensorflow. keras. modelsimport Sequentialfrom tensorflow. keras. layersimport LSTM, Dense, Dropout, BatchNormalizationclass LSTMFeatureEncoder : def __init__ ( self, input_shape, units= 128 , dropout_rate= 0.2 ) : self. model= self. _build_model( input_shape, units, dropout_rate) def _build_model ( self, input_shape, units, dropout_rate) : """构建LSTM特征提取网络""" model= Sequential( [ LSTM( units, return_sequences= True , input_shape= input_shape) , BatchNormalization( ) , Dropout( dropout_rate) , LSTM( units// 2 , return_sequences= False ) , BatchNormalization( ) , Dropout( dropout_rate) , Dense( units// 4 , activation= 'relu' ) , Dense( 1 , activation= 'linear' ) # 输出潜在价格趋势 ] ) model. compile ( optimizer= 'adam' , loss= 'mse' ) return modeldef train ( self, X_train, y_train, epochs= 50 , batch_size= 32 ) : """训练LSTM编码器""" early_stop= tf. keras. callbacks. EarlyStopping( monitor= 'val_loss' , patience= 5 ) self. model. fit( X_train, y_train, validation_split= 0.1 , epochs= epochs, batch_size= batch_size, callbacks= [ early_stop] , verbose= 0 ) def extract_features ( self, X) : """获取LSTM编码后的特征表示""" return self. model. predict( X, verbose= 0 ) 3. 强化学习环境实现 import gymfrom gymimport spacesimport numpyas npclass TradingEnv ( gym. Env) : def __init__ ( self, price_data, initial_balance= 10000 , transaction_cost= 0.001 ) : super ( TradingEnv, self) . __init__( ) # 动作空间:-1卖出,0持有,1买入 self. action_space= spaces. Discrete( 3 ) # 观测空间:包含价格、RSI、MACD等特征 self. observation_space= spaces. Box( low= - np. inf, high= np. inf, shape= ( price_data. shape[ 1 ] + 3 , ) ) self. price_data= price_data self. initial_balance= initial_balance self. transaction_cost= transaction_cost self. reset( ) def reset ( self) : self. current_step= 0 self. portfolio_value= self. initial_balance self. cash= self. initial_balance self. shares= 0 self. max_drawdown= 0 self. trade_history= [ ] return self. _get_obs( ) def _get_obs ( self) : """获取当前市场状态和投资组合状态""" market_state= self. price_data[ self. current_step] portfolio_state= [ self. cash, self. shares, self. portfolio_value] return np. concatenate( [ market_state, portfolio_state] ) def step ( self, action) : # 执行交易操作 prev_cash= self. cash prev_shares= self. sharesif action== 1 : # 买入 buy_amount= min ( self. cash, self. portfolio_value* 0.1 ) self. shares+= buy_amount/ ( self. price_data[ self. current_step, 0 ] * ( 1 + self. transaction_cost) ) self. cash-= buy_amountelif action== - 1 : # 卖出 sell_amount= min ( self. shares* self. price_data[ self. current_step, 0 ] , self. portfolio_value* 0.1 ) self. shares-= sell_amount/ ( self. price_data[ self. current_step, 0 ] * ( 1 - self. transaction_cost) ) self. cash+= sell_amount# 更新组合价值 self. portfolio_value= self. cash+ self. shares* self. price_data[ self. current_step, 0 ] # 计算最大回撤 peak= max ( self. portfolio_value, self. _get_peak( ) ) current_drawdown= ( peak- self. portfolio_value) / peak self. max_drawdown= max ( self. max_drawdown, current_drawdown) # 记录交易历史 self. trade_history. append( { 'step' : self. current_step, 'action' : action, 'price' : self. price_data[ self. current_step, 0 ] , 'portfolio_value' : self. portfolio_value} ) # 判断是否终止 done= self. current_step>= len ( self. price_data) - 1 # 计算奖励 reward= self. _calculate_reward( prev_cash, prev_shares, action) self. current_step+= 1 return self. _get_obs( ) , reward, done, { "max_drawdown" : self. max_drawdown} def _calculate_reward ( self, prev_cash, prev_shares, action) : """设计多维度奖励函数""" # 收益奖励 profit_reward= ( self. portfolio_value- self. initial_balance) / self. initial_balance# 风险惩罚 risk_penalty= self. max_drawdown# 交易成本惩罚 transaction_penalty= abs ( action) * self. transaction_cost# 夏普比率调整项 sharpe_ratio= self. _calculate_sharpe_ratio( ) # 综合奖励 total_reward= profit_reward- risk_penalty- transaction_penalty+ sharpe_ratio* 0.1 return total_rewarddef _calculate_sharpe_ratio ( self, risk_free_rate= 0.02 , periods= 252 ) : """计算年化夏普比率""" returns= [ ] for iin range ( 1 , len ( self. trade_history) ) : prev_value= self. trade_history[ i- 1 ] [ 'portfolio_value' ] curr_value= self. trade_history[ i] [ 'portfolio_value' ] returns. append( ( curr_value- prev_value) / prev_value) if len ( returns) < 2 : return 0 mean_return= np. mean( returns) * periods std_return= np. std( returns) * np. sqrt( periods) sharpe_ratio= ( mean_return- risk_free_rate) / std_returnif std_return!= 0 else 0 return sharpe_ratiodef _get_peak ( self) : """获取历史最高点""" return max ( trade[ 'portfolio_value' ] for tradein self. trade_history) 奖励函数设计原则 1. 多维度平衡机制 有效的奖励函数应同时考虑以下要素:
收益因子 :绝对收益(final_value - initial_value)风险因子 :最大回撤、波动率、VaR效率因子 :夏普比率、信息比率成本因子 :交易频率、滑点损耗稳定性因子 :收益分布的标准差2. 动态权重调整 class DynamicRewardScheduler : def __init__ ( self, base_weights= { 'profit' : 0.4 , 'risk' : 0.3 , 'efficiency' : 0.2 , 'cost' : 0.1 } ) : self. base_weights= base_weights self. current_weights= base_weights. copy( ) def update_weights ( self, training_progress, market_volatility) : """根据训练进度和市场波动动态调整权重""" # 随着训练深入,逐渐增加风险控制的权重 progress_factor= min ( training_progress/ 100 , 1.0 ) self. current_weights[ 'risk' ] = self. base_weights[ 'risk' ] * ( 1 + progress_factor) self. current_weights[ 'profit' ] = self. base_weights[ 'profit' ] * ( 1 - progress_factor/ 2 ) # 根据市场波动调整效率权重 volatility_factor= np. clip( market_volatility/ 0.2 , 0.5 , 2.0 ) self. current_weights[ 'efficiency' ] *= volatility_factor# 确保所有权重之和为1 total= sum ( self. current_weights. values( ) ) for keyin self. current_weights: self. current_weights[ key] /= total3. 惩罚机制设计 违规类型 惩罚方式 数学表达 过度交易 线性递增惩罚 penalty = k * num_trades 持仓集中度过高 二次惩罚 penalty = c * position_concentration² 违反止损规则 固定比例扣除 penalty = stop_loss_violation * portfolio_value 流动性不足 冲击成本模拟 penalty = slippage * order_size
入参关联机制 1. 技术指标与LSTM输入的映射 技术指标 物理意义 LSTM输入维度 归一化范围 收盘价序列 价格趋势 60维向量 [-1, 1] RSI 超买超卖 1维标量 [0, 1] MACD柱状图 动量变化 1维标量 [-2, 2] 成交量 市场活跃度 1维标量 [0, 1] VWAP 平均成本 1维标量 [0, 1]
2. 强化学习状态空间构建 def create_state_space ( price_data, technical_indicators, portfolio_state) : """构建融合市场数据和投资组合的状态向量""" # 市场部分:最近60个时间步的价格序列 market_window= price_data[ - 60 : ] # 假设已按时间顺序排列 # 技术指标快照 indicator_snapshot= np. array( [ technical_indicators[ 'rsi' ] , technical_indicators[ 'macd' ] , technical_indicators[ 'volume' ] ] ) # 投资组合状态 portfolio_vector= np. array( [ portfolio_state[ 'cash' ] , portfolio_state[ 'shares' ] , portfolio_state[ 'portfolio_value' ] ] ) # 拼接所有组件 state_vector= np. concatenate( [ market_window. flatten( ) , indicator_snapshot, portfolio_vector] ) return state_vector. astype( np. float32) 3. 动作空间离散化策略 动作类型 含义 适用场景 仓位管理建议 -1 清仓 预期下跌 保留≥70%现金 0 观望 不确定性高 维持现状 1 满仓 强烈看涨 使用≤30%杠杆 2 半仓 温和上涨 保持灵活性 3 对冲 高风险环境 配置反向ETF
完整策略实现 1. 主程序框架 def main ( ) : # 1. 数据加载与预处理 data_path= 'daily_stock_data.csv' df= pd. read_csv( data_path) preprocessor= DataPreprocessor( lookback_window= 60 ) X, y= preprocessor. prepare_data( df) # 2. 训练LSTM特征编码器 lstm_encoder= LSTMFeatureEncoder( input_shape= ( 60 , len ( features) ) ) lstm_encoder. train( X[ : int ( 0.8 * len ( X) ) ] , y[ : int ( 0.8 * len ( X) ) ] ) # 提取测试集特征 test_features= lstm_encoder. extract_features( X[ int ( 0.8 * len ( X) ) : ] ) # 3. 初始化交易环境 price_data= test_features[ : , : , 0 ] # 取第一个特征作为价格序列 env= TradingEnv( price_data) # 4. 配置强化学习算法(以PPO为例) from stable_baselines3import PPO model= PPO( "MlpPolicy" , env, verbose= 1 , learning_rate= 3e-4 , n_steps= 2048 , batch_size= 64 , ent_coef= 0.0 , tensorboard_log= "./ppo_tensorboard/" ) # 5. 训练智能体 model. learn( total_timesteps= 100000 , log_interval= 10 ) # 6. 回测与评估 obs= env. reset( ) done= False while not done: action, _states= model. predict( obs) obs, rewards, done, info= env. step( action) print ( f"Action: { action} , Portfolio Value: { info[ 'current_portfolio_value' ] : .2f } " ) # 7. 保存模型 model. save( "trading_agent.zip" ) if __name__== "__main__" : main( ) 2. 关键参数调优表 参数类别 推荐范围 典型值 影响方向 LSTM单元数 64-256 128 ↑复杂度/↓速度 Dropout率 0.1-0.3 0.2 ↑泛化/↓拟合 折扣因子γ 0.9-0.99 0.95 ↑长期视野 探索率ε 0.01-0.1 0.05 ↑探索/↓稳定 批量大小 32-128 64 ↑并行/↓内存 学习率 1e-4-1e-3 3e-4 ↑收敛/↓震荡