denoise_RC/exp/exp2_comparison.py
2025-03-22 14:02:38 +08:00

212 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
实验2: 对比实验 (突出 RC 方法的优势)
目标:
将 RC 降噪方法与其他经典及机器学习降噪方法 (如 Wiener 滤波、卡尔曼滤波、SVR、GBRT、LSTM) 进行对比,
在相同混沌系统、噪声类型、数据划分下评估各方法在降噪和动力学特性恢复方面的表现。
实验设置:
与实验1一致加入不同降噪方法的实现和参数配置。
结果展示:
- 表格展示各方法在不同噪声水平下的指标对比。
- 图表展示 (例如 Lyapunov 指数估计误差的折线图)。
"""
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import wiener
from sklearn.ensemble import GradientBoostingRegressor
import reservoirpy as rpy
import reservoirpy.nodes as rpn
rpy.verbosity(0)
rpy.set_seed(42)
from tools import load_data, visualize_psd_diff, visualize_data_diff, print_exec_time
def calculate_metrics(clean_data, prediction):
snr = 10 * np.log10(np.mean(clean_data**2) / np.mean((clean_data - prediction)**2))
rmse = np.sqrt(np.mean((clean_data - prediction)**2))
psnr = 20 * np.log10(np.max(clean_data) / rmse)
corr_coef = np.corrcoef(clean_data, prediction)[0, 1]
return snr, rmse, psnr, corr_coef
@print_exec_time
def test_esn(clean_data, noisy_data, depth, **kwargs):
# 分为训练集和测试集 按百分比
train_size = int(len(clean_data) * 0.8)
train_clean_data = clean_data[:train_size]
train_noisy_data = noisy_data[:train_size]
test_clean_data = clean_data[train_size:]
test_noisy_data = noisy_data[train_size:]
# 形状均为 (n_samples, n_dimensions) -> (40000, 3)
def build_esn_model(units, lr, sr, reg, output_dim, reservoir_node=None):
if reservoir_node is None:
reservoir_node = rpn.Reservoir(units=units, lr=lr, sr=sr)
output_node = rpn.Ridge(output_dim=output_dim, ridge=reg)
model = reservoir_node >> output_node
return model, reservoir_node
def train_esn(model, clean_data, noisy_data, warmup=1000):
model.fit(noisy_data, clean_data, warmup=warmup)
prediction = model.run(noisy_data)
return model, prediction
def validate_esn(model, noisy_data):
prediction = model.run(noisy_data)
return prediction
results = []
lv1_units = kwargs.get('lv1_units', 300)
lv1_lr = kwargs.get('lv1_lr', 0.3)
lv1_sr = kwargs.get('lv1_sr', 0.9)
lv1_reg = kwargs.get('lv1_reg', 1e-6)
lv1_output_dim = kwargs.get('lv1_output_dim', 3)
lv1_model, reservoir_node = build_esn_model(lv1_units, lv1_lr, lv1_sr, lv1_reg, lv1_output_dim)
lv1_model, lv1_prediction = train_esn(lv1_model, train_clean_data, train_noisy_data)
lv1_validation = validate_esn(lv1_model, test_noisy_data)
lv1_metrics = calculate_metrics(test_clean_data, lv1_validation)
results.append(lv1_metrics)
if depth > 1:
lvi_train_prediction = lv1_prediction
lvi_validation = lv1_validation
for i in range(2, depth+1):
lvi_units = kwargs.get(f'lv{i}_units', 300)
lvi_lr = kwargs.get(f'lv{i}_lr', 0.3)
lvi_sr = kwargs.get(f'lv{i}_sr', 0.9)
lvi_reg = kwargs.get(f'lv{i}_reg', 1e-6)
lvi_output_dim = kwargs.get(f'lv{i}_output_dim', 3)
lvi_model, _ = build_esn_model(lvi_units, lvi_lr, lvi_sr, lvi_reg, lvi_output_dim, reservoir_node)
lvi_model, lvi_train_prediction = train_esn(lvi_model, train_clean_data, lvi_train_prediction)
lvi_validation = validate_esn(lvi_model, lvi_validation)
lvi_metrics = calculate_metrics(test_clean_data, lvi_validation)
results.append(lvi_metrics)
visualize_data_diff(test_clean_data, test_noisy_data, lvi_validation, warmup=0, title_prefix='ESN')
return results, lvi_validation
@print_exec_time
def test_kalman(clean_data, noisy_data, process_variance=1e-5, measurement_variance=1e-1):
# 假设 1D 卡尔曼模型: x(k+1) = x(k) + 过程噪声, z(k) = x(k) + 测量噪声
# 对每个通道进行滤波
# process_variance = 1e-5, 1e-4, 1e-3
# measurement_variance = 1e-3, 1e-2, 1e-1
# 1e-5, 1e-3 -> Kalman Metrics: (5.690448393281406 , 0.5213188005972011, 14.43781885284762 , 0.9999978828900199)
# 1e-3, 1e-1 -> Kalman Metrics: (5.6905031524143475, 0.5213155140166996, 14.437873611980564, 0.9999978828900199)
filtered_data = np.zeros_like(noisy_data)
for ch in range(noisy_data.shape[1]):
n_samples = noisy_data.shape[0]
x_est = 0.0 # 初始状态估计
p_est = 1.0 # 初始估计协方差
for k in range(n_samples):
# 预测阶段
x_pred = x_est
p_pred = p_est + process_variance
# 更新阶段
z_k = noisy_data[k, ch] # 测量值
K = p_pred / (p_pred + measurement_variance)
x_est = x_pred + K * (z_k - x_pred)
p_est = (1 - K) * p_pred
filtered_data[k, ch] = x_est
metrics = calculate_metrics(clean_data, filtered_data)
return metrics, filtered_data
@print_exec_time
def test_wiener(clean_data, noisy_data, mysize=35):
# mysize=[5, 7]
# mysize = 5 -> Wiener Metrics: (7.082912643590872 , 0.4440993882054787, 15.830283103157086, 0.9999978828900199)
# mysize = 7 -> Wiener Metrics: (8.393434517491048 , 0.3819038892573478, 17.14080497705726 , 0.9999978828900199)
# mysize = 9 -> Wiener Metrics: (9.396860794197918 , 0.3402379612434453, 18.144231253764133, 0.9999978828900199)
# mysize = 11 -> Wiener Metrics: (10.142598565197332, 0.3122452775883699, 18.889969024763545, 0.9999978828900199)
# mysize = 15 -> Wiener Metrics: (11.128377345656945, 0.2787449061849072, 19.87574780522316 , 0.9999978828900199)
# mysize = 25 -> Wiener Metrics: (11.145673899312722, 0.27819038279543434, 19.89304435887894, 0.9999978828900199)
filtered_data = np.zeros_like(noisy_data)
for i in range(noisy_data.shape[1]):
filtered_data[:, i] = wiener(noisy_data[:, i], mysize=mysize)
metrics = calculate_metrics(clean_data, filtered_data)
return metrics, filtered_data
@print_exec_time
def test_gbrt(clean_data, noisy_data, max_depth=13):
'''
dep-13 -> GBRT Metrics: (5.187844922790017, 0.5523744230155584, 13.935215382356231, 0.9999978828900199)
dep-11 -> GBRT Metrics: (5.408985965603853, 0.5384885940824224, 14.156356425170069, 0.9999978828900199)
dep-9 -> GBRT Metrics: (5.298970192515195, 0.5453524862821368, 14.046340652081408, 0.9999978828900199)
dep-7 -> GBRT Metrics: (4.410332845010704, 0.604100457756635 , 13.15770330457692 , 0.9999978828900199)
'''
gbrt = GradientBoostingRegressor(n_estimators=100, learning_rate=0.1, max_depth=max_depth, random_state=42)
filtered_data = np.zeros_like(noisy_data) # 初始化 filtered_data形状与 noisy_data 相同
for i in range(noisy_data.shape[1]): # 遍历通道
x_vals_all = np.arange(len(noisy_data)).reshape(-1, 1) # 完整数据集时间索引
gbrt.fit(x_vals_all, noisy_data[:, i]) # 在 *整个数据集* 上训练,目标是 noisy_data的 *单个通道*
filtered_data[:, i] = gbrt.predict(x_vals_all) # 预测 *整个数据集*,输入是 *时间索引*
metrics = calculate_metrics(clean_data, filtered_data)
visualize_data_diff(clean_data, noisy_data, filtered_data, warmup=0, title_prefix='GBRT_FullData') # 修改 title_prefix 以区分
return metrics, filtered_data
@print_exec_time
def test_moving_average(clean_data, noisy_data, window_size=10):
# 10 -> Moving Average Metrics: (7.742861478461627, 0.4116069821396233, 16.49023193802784 , 0.9999978828900199)
# 15 -> Moving Average Metrics: (5.702762184458747, 0.5205802622393892, 14.450132644024961, 0.9999978828900199)
filtered_data = np.zeros_like(noisy_data)
# 对每个通道应用移动平均滤波
for i in range(noisy_data.shape[1]):
cumsum = np.cumsum(np.insert(noisy_data[:, i], 0, 0))
avg = (cumsum[window_size:] - cumsum[:-window_size]) / window_size
# 从索引 window_size-1 开始赋值,确保形状一致
filtered_data[window_size-1:, i] = avg
for j in range(window_size - 1):
filtered_data[j, i] = np.mean(noisy_data[0:j+1, i])
metrics = calculate_metrics(clean_data, filtered_data)
return metrics, filtered_data
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--mode', type=str, default='wiener', help='选择测试的降噪方法')
args = parser.parse_args()
# 数据加载与预处理
clean_data, noisy_data = load_data(system='lorenz', noise='gaussian',
intensity=0.8, init=[1, 1, 1],
n_timesteps=30000, transient=10000, h=0.01)
if args.mode == 'gbrt' or args.mode == 'all':
gbrt_metrics, gbrt_prediction = test_gbrt(clean_data, noisy_data)
print("GBRT Metrics:", gbrt_metrics)
#visualize_data_diff(clean_data, noisy_data, gbrt_prediction, warmup=0, title_prefix='GBRT')
if args.mode == 'kalman' or args.mode == 'all':
kalman_metrics, kalman_prediction = test_kalman(clean_data, noisy_data)
print("Kalman Metrics:", kalman_metrics)
visualize_data_diff(clean_data, noisy_data, kalman_prediction, warmup=0, title_prefix='Kalman')
if args.mode == 'wiener' or args.mode == 'all':
wiener_metrics, wiener_prediction = test_wiener(clean_data, noisy_data)
print("Wiener Metrics:", wiener_metrics)
visualize_data_diff(clean_data, noisy_data, wiener_prediction, warmup=0, title_prefix='Wiener')
if args.mode == 'esn' or args.mode == 'all':
esn_metrics, esn_prediction = test_esn(clean_data, noisy_data, depth=3)
print("ESN Metrics:", esn_metrics[-1])
if args.mode == 'moving_average' or args.mode == 'all':
ma_metrics, ma_prediction = test_moving_average(clean_data, noisy_data)
print("Moving Average Metrics:", ma_metrics)
visualize_data_diff(clean_data, noisy_data, ma_prediction, warmup=0, title_prefix='Moving Average')
plt.show()