denoise_RC/exp/tools.py

237 lines
9.2 KiB
Python
Raw Normal View History

import time
import numpy as np
from matplotlib import pyplot as plt
from reservoirpy import datasets
def print_exec_time(func):
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
print(f"Function {func.__name__} executed in {time.time() - start:.4f} seconds.")
return result
return wrapper
def add_noise(data, noise_type='gaussian', intensity=0.1, **kwargs):
"""
为输入数据添加噪声
参数:
data: numpy 数组, 原始数据.
noise_type: str, 噪声类型可选 'gaussian'高斯白噪声'colored' '1/f'色噪声'impulse'脉冲噪声'sine'正弦噪声
intensity: float, 噪声强度.
kwargs: 针对某些噪声类型的额外参数例如:
- 对于 'impulse': prob (脉冲发生概率, 默认 0.01)
- 对于 'sine' : freq (正弦信号频率, 默认 5)
返回:
添加噪声后的数据.
"""
noise_type = noise_type.lower()
# 确保噪声与数据形状匹配,处理多维数据
if data.ndim > 1:
n_samples, n_dimensions = data.shape
else:
n_samples = len(data)
n_dimensions = 1
if noise_type in ['gaussian', 'gaussian white']:
noise = intensity * np.random.randn(*data.shape)
return data + noise
elif noise_type in ['colored', '1/f']:
# 对于多维数据为每个维度单独生成1/f噪声
if data.ndim > 1:
noise = np.zeros_like(data)
for dim in range(n_dimensions):
f = np.fft.fftfreq(n_samples)
f[0] = 1e-10
noise_complex = np.random.randn(n_samples) + 1j * np.random.randn(n_samples)
factor = intensity / np.sqrt(np.abs(f))
noise[:, dim] = np.fft.ifft(noise_complex * factor).real
else:
f = np.fft.fftfreq(n_samples)
f[0] = 1e-10
noise_complex = np.random.randn(n_samples) + 1j * np.random.randn(n_samples)
factor = intensity / np.sqrt(np.abs(f))
noise = np.fft.ifft(noise_complex * factor).real
return data + noise
elif noise_type == 'impulse':
noise = np.zeros_like(data)
prob = kwargs.get('prob', 0.01)
mask = np.random.rand(*data.shape) < prob
impulse = intensity * (2 * np.random.rand(*data.shape) - 1)
noise[mask] = impulse[mask]
return data + noise
elif noise_type == 'sine':
t = np.arange(n_samples)
freq = kwargs.get('freq', 5)
# 对于多维数据,确保正弦波形状正确
if data.ndim > 1:
sine_wave = intensity * np.sin(2 * np.pi * freq * t / n_samples).reshape(-1, 1)
# 扩展到与数据相同的维度
sine_wave = np.tile(sine_wave, (1, n_dimensions))
else:
sine_wave = intensity * np.sin(2 * np.pi * freq * t / n_samples)
return data + sine_wave
else:
raise ValueError("未知的噪声类型。可选项:'gaussian', 'colored' (1/f), 'impulse', 'sine'")
def load_data(system='lorenz', init='random', noise=None, intensity=0.1, h=0.01, n_timesteps=10000, transient=1000, normlization=True, **kwargs):
"""
加载混沌系统数据.
参数:
system: str, 混沌系统类型, 可选 'lorenz', 'rossler', 'multiscroll', 'kuramoto_sivashinsky'
init: 初始值. 如果为 'random' 则随机初始化否则期望传入数组形式的初始值.
noise: None, str str 列表, 指定要添加的噪声类型. 如果是列表则混合各噪声 (取平均) 作为混合噪声.
-- 可选 'gaussian'高斯白噪声'colored' '1/f'色噪声'impulse'脉冲噪声'sine'正弦噪声
intensity: float, 噪声强度.
h: float, 时间步长 (TimeDelta).
n_timesteps: int, 时间步数.
transient: int, 需丢弃的时间步数.
normlization: bool, 是否对数据进行归一化处理.
kwargs: 其他系统参数.
返回:
(clean_data, noisy_data): 两个 numpy 数组, 分别为干净数据和添加噪声后的数据.
各系统默认值:
- lorenz: rho=28, sigma=10, beta=8/3, h=0.03, x0=[1, 1, 1]
- rossler: a=0.2, b=0.2, c=5.7, h=0.1, x0=[-0.1, 0, 0.02]
- multiscroll: a=40, b=3, c=28, h=0.01, x0=[-0.1, 0.5, -0.6]
- kuramoto_sivashinsky: N=128, M=16, h=0.25, x0=None
"""
system = system.lower()
if init == 'random':
if system in ['lorenz', 'rossler', 'multiscroll']:
# 默认3维初始值
state = np.random.rand(3)
elif system in ['kuramoto_sivashinsky']:
state = np.random.rand(1)
else:
raise ValueError("未知的混沌系统类型。")
else:
state = np.array(init, dtype=float)
if system == 'lorenz':
'''
(function) def lorenz(
n_timesteps: int,
rho: float = 28,
sigma: float = 10,
beta: float = 8 / 3,
x0: list | ndarray = [1, 1, 1],
h: float = 0.03,
**kwargs: Any
) -> ndarray
'''
sigma = kwargs.get('sigma', 10.0)
rho = kwargs.get('rho', 28.0)
beta = kwargs.get('beta', 8/3)
clean_data = datasets.lorenz(n_timesteps=n_timesteps, h=h, sigma=sigma, rho=rho, beta=beta, x0=state)
elif system == 'rossler':
'''
(function) def rossler(
n_timesteps: int,
a: float = 0.2,
b: float = 0.2,
c: float = 5.7,
x0: list | ndarray = [-0.1, 0, 0.02],
h: float = 0.1,
**kwargs: Any
) -> ndarray
'''
a = kwargs.get('a', 0.2)
b = kwargs.get('b', 0.2)
c = kwargs.get('c', 5.7)
clean_data = datasets.rossler(n_timesteps=n_timesteps, h=h, a=a, b=b, c=c, x0=state)
elif system == 'multiscroll':
'''
(function) def multiscroll(
n_timesteps: int,
a: float = 40,
b: float = 3,
c: float = 28,
x0: list | ndarray = [-0.1, 0.5, -0.6],
h: float = 0.01,
**kwargs: Any
) -> ndarray
'''
a = kwargs.get('a', 40)
b = kwargs.get('b', 3)
c = kwargs.get('c', 28)
clean_data = datasets.multiscroll(n_timesteps=n_timesteps, h=h, x0=state, a=a, b=b, c=c)
elif system == 'kuramoto_sivashinsky':
'''
(function) def kuramoto_sivashinsky(
n_timesteps: int,
warmup: int = 0,
N: int = 128,
M: float = 16,
x0: list | ndarray = None,
h: float = 0.25
) -> ndarray
'''
clean_data = datasets.kuramoto_sivashinsky(n_timesteps=n_timesteps, h=h, **kwargs)
else:
raise ValueError("未知的混沌系统类型。可选项: 'lorenz', 'rossler', 'multiscroll', 'kuramoto_sivashinsky'")
if normlization:
clean_data = (clean_data - clean_data.mean(axis=0)) / clean_data.std(axis=0) # Z-score 归一化
clean_data = clean_data[transient:]
# 添加噪声
if noise is not None:
if isinstance(noise, list):
noise_sum = np.zeros_like(clean_data)
for nt in noise:
noise_sum += add_noise(np.zeros_like(clean_data), noise_type=nt, intensity=intensity, **kwargs)
mixed_noise = noise_sum / len(noise)
noisy_data = clean_data + mixed_noise
elif isinstance(noise, str):
noisy_data = add_noise(clean_data, noise_type=noise, intensity=intensity, **kwargs)
else:
raise ValueError("噪声参数 noise 必须为字符串或字符串列表。")
else:
noisy_data = clean_data.copy()
return clean_data, noisy_data
def visualize_data_diff(clean_data, noisy_data, prediction, warmup=0, title_prefix=""):
plt.figure(figsize=(12, 6))
for i in range(3):
plt.subplot(3, 2, 2*i+1)
plt.plot(noisy_data[warmup:, i], label='noisy')
plt.plot(clean_data[warmup:, i], label='clean')
plt.title(f"{title_prefix} Dim {i+1} (Noisy vs Clean)")
plt.legend()
plt.subplot(3, 2, 2*i+2)
plt.plot(prediction[warmup:, i], label='prediction')
plt.plot(clean_data[warmup:, i], label='clean')
plt.title(f"{title_prefix} Dim {i+1} (Prediction vs Clean)")
plt.legend()
plt.tight_layout()
def visualize_psd_diff(clean_data, noisy_data, prediction, warmup=0, title_prefix=""):
plt.figure(figsize=(12, 6))
for i in range(3):
plt.subplot(3, 1, i+1)
plt.psd(clean_data[warmup:, i], NFFT=1024, label='clean')
plt.psd(noisy_data[warmup:, i], NFFT=1024, label='noisy')
plt.psd(prediction[warmup:, i], NFFT=1024, label='prediction')
plt.title(f"{title_prefix} Dim {i+1} PSD Comparison")
plt.legend()
plt.tight_layout()
if __name__ == "__main__":
# 测试数据加载和噪声添加
clean_data, noisy_data = load_data(system='lorenz', noise=['gaussian', 'colored'], intensity=0.1, n_timesteps=10000, transient=1000)
print("Clean Data Shape:", clean_data.shape)
print("Noisy Data Shape:", noisy_data.shape)