✅博主简介:擅长数据搜集与处理、建模仿真、程序设计、仿真代码、论文写作与指导,毕业论文、期刊论文经验交流。
✅成品或者定制,扫描文章底部微信二维码。
(1)基于K线图表征的时序数据转换与技术指标融合方法
传统的大气污染时序预测方法通常直接将原始的污染物浓度数据输入深度学习模型进行训练和预测,这种方式虽然简单直观,但往往忽略了时序数据内在的波动特征和趋势变化规律。借鉴金融领域K线分析的成熟理论体系,本研究设计了一种将大气污染物浓度时序数据转换为K线图形式的表征方法。具体而言,以固定时间窗口(如每小时或每日)为单位,提取该时段内污染物浓度的开盘值(起始浓度)、收盘值(终止浓度)、最高值和最低值四个关键特征,构成一根完整的K线实体。K线的实体部分反映了该时段内污染物浓度的整体变化方向,而上下影线则刻画了浓度的波动幅度和极端值出现情况。这种表征方式相比原始数据具有显著优势:首先,K线图能够压缩数据维度的同时保留关键的趋势信息;其次,影线属性能够有效捕捉污染物浓度的突变信号,这对于预测突发性污染事件具有重要价值。在K线表征的基础上,本研究进一步引入了移动平均线、相对强弱指标、布林带等技术指标来增强数据的特征表达能力。移动平均线能够平滑短期噪声并揭示中长期趋势,相对强弱指标可以量化当前污染浓度相对于历史水平的位置,布林带则能够识别异常波动区间。这些技术指标与K线图形成互补,共同构建了一个多维度的特征表示空间,为后续的深度学习模型提供了更加丰富和稳健的输入特征。实验验证表明,经过K线化处理后的数据在短期预测任务中的均方根误差相比原始数据直接输入的方法降低了近一半,特别是在污染物浓度发生剧烈变化的时段,K线表征方法能够更及时地捕捉变化趋势,有效缓解了传统模型因滞后性导致的预测不准确问题。
(2)基于历史相似模式搜索的多步预测误差抑制策略
多步预测是大气污染预测领域的重要应用场景,因为环境管理部门通常需要提前数小时甚至数天获知污染趋势以制定应对措施。然而,传统的深度学习多步预测方法通常采用迭代策略,即将单步预测结果作为下一步预测的输入,这种方式会导致预测误差随步数增加而不断累积放大,最终使得远期预测结果严重偏离实际值。为解决这一难题,本研究提出了基于K线模式匹配的历史相似序列搜索方法。该方法的核心思想是利用历史数据中与当前K线形态相似的时段作为参照,借助历史数据中已知的后续演化趋势来指导当前的多步预测。具体实现过程中,首先需要定义K线序列之间的相似性度量准则,本研究综合考虑了K线实体长度、影线比例、连续K线的组合形态等多个维度,采用加权欧氏距离计算当前观测窗口内K线序列与历史库中所有候选序列的相似度得分。对于相似度最高的若干历史序列,提取其后续时段的实际演化轨迹,通过加权平均或核密度估计方法融合这些历史轨迹信息,生成对未来多步的趋势预测。这种基于模式匹配的预测方法本质上是一种后验全局特征的融入策略,它不依赖于迭代预测过程,因此能够从根本上避免误差累积问题。实验结果显示,在长期预测任务中,基于模式匹配的方法相比传统迭代预测方法的均方根误差降低了超过一半,尤其在预测步数超过六步后的优势更加明显,证明了该方法在抑制多步预测误差累积方面的有效性。
(3)融合深度学习的特征增强与模型轻量化设计
尽管模式匹配方法在捕获全局趋势特征方面表现出色,但传统的模式匹配技术在处理复杂、高维的K线特征时仍存在能力不足的问题。为此,本研究构建了融入深度学习方法的特征提取模型,以增强模式匹配过程中的特征表达能力。考虑到匹配得到的相似历史K线序列转换为图像后像素点相对较少,直接输入深度神经网络可能导致特征不明显的问题,本研究首先对K线图像进行了特征增强处理。具体包括:采用双线性插值或超分辨率重建方法提升K线图像的分辨率;通过对比度增强算法突出K线实体和影线的边界特征;引入边缘检测算子强化K线形态的几何轮廓信息。经过增强处理后的K线图像特征更加鲜明,有利于深度神经网络的有效学习。在网络结构设计方面,本研究采用了卷积神经网络作为特征提取的骨干网络,但针对K线图像的特殊性质进行了专门优化。为了减少模型的参数量和计算复杂度,本研究在网络的高层特征图上引入了全局平均池化操作,该操作能够将每个特征通道压缩为单个数值,在保留主要特征信息的同时大幅降低了后续全连接层的参数规模。
import numpy as np import pandas as pd from sklearn.preprocessing import MinMaxScaler from sklearn.metrics import mean_squared_error import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader, TensorDataset from scipy.spatial.distance import cdist class KLineConverter: def __init__(self, window_size=24): self.window_size = window_size def convert_to_kline(self, series): n_klines = len(series) // self.window_size klines = [] for i in range(n_klines): window = series[i*self.window_size:(i+1)*self.window_size] open_val = window[0] close_val = window[-1] high_val = np.max(window) low_val = np.min(window) klines.append([open_val, high_val, low_val, close_val]) return np.array(klines) def compute_technical_indicators(self, klines, ma_period=5): closes = klines[:, 3] ma = np.convolve(closes, np.ones(ma_period)/ma_period, mode='valid') ma = np.pad(ma, (ma_period-1, 0), mode='edge') delta = np.diff(closes, prepend=closes[0]) gain = np.where(delta > 0, delta, 0) loss = np.where(delta < 0, -delta, 0) avg_gain = np.convolve(gain, np.ones(14)/14, mode='same') avg_loss = np.convolve(loss, np.ones(14)/14, mode='same') rs = avg_gain / (avg_loss + 1e-8) rsi = 100 - (100 / (1 + rs)) return np.column_stack([klines, ma, rsi]) class PatternMatcher: def __init__(self, history_data, pattern_length=10): self.history = history_data self.pattern_length = pattern_length def find_similar_patterns(self, current_pattern, top_k=5): n_candidates = len(self.history) - self.pattern_length - 1 distances = [] for i in range(n_candidates): candidate = self.history[i:i+self.pattern_length] dist = np.sqrt(np.sum((current_pattern - candidate)**2)) distances.append((i, dist)) distances.sort(key=lambda x: x[1]) return [d[0] for d in distances[:top_k]] def predict_from_patterns(self, indices, steps_ahead): future_values = [] for idx in indices: future_start = idx + self.pattern_length future_end = future_start + steps_ahead if future_end <= len(self.history): future_values.append(self.history[future_start:future_end, 3]) if future_values: return np.mean(future_values, axis=0) return None class KLineFeatureExtractor(nn.Module): def __init__(self, input_channels=6, hidden_dim=64): super(KLineFeatureExtractor, self).__init__() self.conv1 = nn.Conv1d(input_channels, 32, kernel_size=3, padding=1) self.conv2 = nn.Conv1d(32, 64, kernel_size=3, padding=1) self.conv3 = nn.Conv1d(64, hidden_dim, kernel_size=3, padding=1) self.pool = nn.AdaptiveAvgPool1d(1) self.fc = nn.Linear(hidden_dim, 32) self.output = nn.Linear(32, 1) self.relu = nn.ReLU() self.dropout = nn.Dropout(0.3) def forward(self, x): x = x.permute(0, 2, 1) x = self.relu(self.conv1(x)) x = self.relu(self.conv2(x)) x = self.relu(self.conv3(x)) x = self.pool(x).squeeze(-1) x = self.dropout(self.relu(self.fc(x))) return self.output(x) class PM25Predictor: def __init__(self, seq_length=10, pred_horizon=6): self.seq_length = seq_length self.pred_horizon = pred_horizon self.scaler = MinMaxScaler() self.model = KLineFeatureExtractor() self.converter = KLineConverter() def prepare_sequences(self, kline_data): X, y = [], [] for i in range(len(kline_data) - self.seq_length - self.pred_horizon): X.append(kline_data[i:i+self.seq_length]) y.append(kline_data[i+self.seq_length, 3]) return np.array(X), np.array(y) def train(self, train_data, epochs=100, batch_size=32, lr=0.001): klines = self.converter.convert_to_kline(train_data) klines_with_indicators = self.converter.compute_technical_indicators(klines) scaled_data = self.scaler.fit_transform(klines_with_indicators) X, y = self.prepare_sequences(scaled_data) X_tensor = torch.FloatTensor(X) y_tensor = torch.FloatTensor(y).unsqueeze(1) dataset = TensorDataset(X_tensor, y_tensor) loader = DataLoader(dataset, batch_size=batch_size, shuffle=True) optimizer = optim.Adam(self.model.parameters(), lr=lr) criterion = nn.MSELoss() self.model.train() for epoch in range(epochs): total_loss = 0 for batch_x, batch_y in loader: optimizer.zero_grad() outputs = self.model(batch_x) loss = criterion(outputs, batch_y) loss.backward() optimizer.step() total_loss += loss.item() def predict_multistep(self, test_data, use_pattern_matching=True): klines = self.converter.convert_to_kline(test_data) klines_with_indicators = self.converter.compute_technical_indicators(klines) scaled_data = self.scaler.transform(klines_with_indicators) self.model.eval() predictions = [] if use_pattern_matching: matcher = PatternMatcher(scaled_data, self.seq_length) current_pattern = scaled_data[-self.seq_length:] similar_indices = matcher.find_similar_patterns(current_pattern) pattern_pred = matcher.predict_from_patterns(similar_indices, self.pred_horizon) if pattern_pred is not None: predictions.extend(pattern_pred.tolist()) return np.array(predictions)如有问题,可以直接沟通
👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇