小波变换完全指南:从原理到实践的深度解析
📌 引言
在信号处理领域,傅里叶变换长期以来一直是频域分析的主要工具。然而,傅里叶变换存在一个根本性的局限:它只能告诉我们信号中包含哪些频率成分,却无法告诉我们这些频率成分在什么时刻出现。这就像是你知道一首歌用了哪些音符,却不知道这些音符的演奏顺序。
小波变换(Wavelet Transform) 应运而生,它完美地解决了这个问题。小波变换不仅能够提供频率信息,还能同时提供时间信息,实现了信号的时频联合分析。本文将带你从零开始,深入理解小波变换的原理、实现和应用。
为什么需要小波变换?
- 时频局部化:能够同时获取信号的时间和频率信息
- 多尺度分析:可以在不同尺度上观察信号特征
- 自适应性:能够自动适应信号的局部特征
- 稀疏表示:许多实际信号在小波域具有稀疏性
📚 第一部分:小波变换的数学原理
1.1 从傅里叶变换到小波变换
傅里叶变换的局限性
傅里叶变换的定义如下:
F(ω) = ∫_{-∞}^{∞} f(t)e^{-jωt} dt
它使用正弦和余弦这样的全局基函数,无法捕捉信号的局部特征。
短时傅里叶变换(STFT)
为了获取时频信息,人们提出了短时傅里叶变换:
STFT(t, ω) = ∫_{-∞}^{∞} f(τ)w(τ-t)e^{-jωτ} dτ
其中 w(t) 是窗函数。但STFT存在固定窗口大小的问题,无法同时获得高时间分辨率和高频率分辨率。
小波变换的突破
小波变换通过使用可伸缩、可平移的小波基函数,实现了自适应的时频分辨率:
W(a, b) = (1/√a) ∫_{-∞}^{∞} f(t)ψ*((t-b)/a) dt
其中:
- ψ(t) 是母小波函数
- a 是尺度参数(scale),控制小波的伸缩
- b 是平移参数(translation),控制小波的位置
-
- 表示共轭
1.2 小波函数的要求
一个函数要成为小波函数,必须满足以下条件:
- 允许性条件(Admissibility Condition):
Cψ = ∫_{-∞}^{∞} |Ψ(ω)|²/|ω| dω < ∞
其中 Ψ(ω) 是 ψ(t) 的傅里叶变换。
- 零均值条件:
∫_{-∞}^{∞} ψ(t) dt = 0
这意味着小波函数必须是振荡的。
- 能量有限:
∫_{-∞}^{∞} |ψ(t)|² dt < ∞
- 时频局部化:小波函数在时域和频域都应该有良好的局部性。
1.3 连续小波变换(CWT)
定义
连续小波变换定义为:
CWT_x^ψ(a,b) = (1/√|a|) ∫_{-∞}^{∞} x(t)ψ*((t-b)/a) dt = <x, ψ_{a,b}>
其中小波族定义为:
ψ_{a,b}(t) = (1/√|a|)ψ((t-b)/a)
物理意义
-
尺度参数 a:
- a > 1:小波被拉伸,捕捉低频(粗糙)特征
- a < 1:小波被压缩,捕捉高频(细节)特征
-
平移参数 b:小波沿时间轴移动,扫描整个信号
逆变换
如果满足允许性条件,信号可以完全重构:
x(t) = (1/Cψ) ∫∫ CWT_x^ψ(a,b) ψ_{a,b}(t) da db/a²
1.4 离散小波变换(DWT)
尺度和平移的离散化
为了计算效率,我们对尺度和平移进行离散化:
a = a₀^j, b = ka₀^j b₀
通常取 a₀ = 2, b₀ = 1(二进离散),得到:
ψ_{j,k}(t) = 2^(-j/2) ψ(2^(-j)t - k)
多分辨率分析(MRA)
离散小波变换基于多分辨率分析理论,将信号分解为:
f(t) = Σ_k c_{J,k}φ_{J,k}(t) + Σ_{j=1}^J Σ_k d_{j,k}ψ_{j,k}(t)
其中:
- φ(t) 是尺度函数(father wavelet)
- ψ(t) 是小波函数(mother wavelet)
- c_{J,k} 是近似系数(approximation coefficients)
- d_{j,k} 是细节系数(detail coefficients)
Mallat算法
Mallat提出了快速小波分解算法,使用两个滤波器:
- 低通滤波器 h[n]:提取近似信息
- 高通滤波器 g[n]:提取细节信息
分解过程:
c_{j+1}[k] = Σ_n h[n-2k]c_j[n] (下采样)
d_{j+1}[k] = Σ_n g[n-2k]c_j[n] (下采样)
重构过程:
c_j[k] = Σ_n h[k-2n]c_{j+1}[n] + Σ_n g[k-2n]d_{j+1}[n]
1.5 常用小波基函数
1. Haar小波
最简单的小波,定义为:
ψ(t) = { 1, 0 ≤ t < 1/2{-1, 1/2 ≤ t < 1{ 0, 其他
特点:正交、紧支撑、不连续、计算最快
2. Daubechies小波(dbN)
由Ingrid Daubechies构造,N表示消失矩阶数。
特点:
- 正交
- 紧支撑
- 最大平滑性
- db1就是Haar小波
3. Symlet小波(symN)
Daubechies小波的改进版本,更加对称。
特点:近似对称、正交、紧支撑
4. Coiflet小波(coifN)
同时在尺度函数和小波函数上具有消失矩。
特点:尺度函数和小波函数都近似对称
5. Morlet小波
定义为:
ψ(t) = exp(-t²/2)cos(5t)
特点:
- 非正交
- 非紧支撑
- 良好的时频局部化
- 常用于连续小波变换
6. Mexican Hat小波(Ricker小波)
定义为:
ψ(t) = (1 - t²)exp(-t²/2)
特点:
- 二阶导数的高斯函数
- 对称
- 适合边缘检测
💻 第二部分:Python实现与仿真
2.1 环境准备
# 安装必要的库
# pip install pywavelets numpy matplotlib scipyimport numpy as np
import matplotlib.pyplot as plt
import pywt
from scipy import signal
import warnings
warnings.filterwarnings('ignore')# 设置中文显示
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False# 设置绘图风格
plt.style.use('seaborn-v0_8-darkgrid')
2.2 生成测试信号
def generate_test_signal(duration=1, fs=1000):"""生成包含多个频率成分的复合信号参数:duration: 信号时长(秒)fs: 采样频率(Hz)"""t = np.linspace(0, duration, int(fs * duration))# 成分1: 10Hz正弦波(持续整个时间)component1 = np.sin(2 * np.pi * 10 * t)# 成分2: 50Hz正弦波(0.3s到0.6s)component2 = np.zeros_like(t)mask2 = (t >= 0.3) & (t <= 0.6)component2[mask2] = np.sin(2 * np.pi * 50 * t[mask2])# 成分3: 100Hz正弦波(0.7s到0.9s)component3 = np.zeros_like(t)mask3 = (t >= 0.7) & (t <= 0.9)component3[mask3] = np.sin(2 * np.pi * 100 * t[mask3])# 添加噪声noise = 0.2 * np.random.randn(len(t))# 合成信号signal_clean = component1 + component2 + component3signal_noisy = signal_clean + noisereturn t, signal_clean, signal_noisy, component1, component2, component3# 生成信号
t, sig_clean, sig_noisy, comp1, comp2, comp3 = generate_test_signal()# 可视化
fig, axes = plt.subplots(3, 1, figsize=(14, 10))axes[0].plot(t, comp1, label='10Hz', alpha=0.7)
axes[0].plot(t, comp2, label='50Hz', alpha=0.7)
axes[0].plot(t, comp3, label='100Hz', alpha=0.7)
axes[0].set_title('信号各个成分', fontsize=14, fontweight='bold')
axes[0].set_xlabel('时间 (s)')
axes[0].set_ylabel('幅度')
axes[0].legend()
axes[0].grid(True)axes[1].plot(t, sig_clean, 'b', linewidth=1.5)
axes[1].set_title('干净的复合信号', fontsize=14, fontweight='bold')
axes[1].set_xlabel('时间 (s)')
axes[1].set_ylabel('幅度')
axes[1].grid(True)axes[2].plot(t, sig_noisy, 'r', linewidth=0.8, alpha=0.7)
axes[2].set_title('含噪声的复合信号', fontsize=14, fontweight='bold')
axes[2].set_xlabel('时间 (s)')
axes[2].set_ylabel('幅度')
axes[2].grid(True)plt.tight_layout()
plt.savefig('test_signal.png', dpi=300, bbox_inches='tight')
plt.show()
2.3 连续小波变换实现
def plot_cwt(signal, fs, wavelet='cmor1.5-1.0', scales=None):"""执行连续小波变换并可视化参数:signal: 输入信号fs: 采样频率wavelet: 小波类型scales: 尺度范围"""if scales is None:# 自动生成尺度scales = np.arange(1, 128)# 执行连续小波变换coefficients, frequencies = pywt.cwt(signal, scales, wavelet, 1/fs)# 计算功率谱power = np.abs(coefficients) ** 2# 创建时间轴t = np.arange(len(signal)) / fs# 绘图fig, axes = plt.subplots(2, 1, figsize=(14, 10))# 原始信号axes[0].plot(t, signal, 'k', linewidth=1)axes[0].set_title('原始信号', fontsize=14, fontweight='bold')axes[0].set_xlabel('时间 (s)')axes[0].set_ylabel('幅度')axes[0].grid(True)# 小波时频图im = axes[1].contourf(t, frequencies, power, levels=100, cmap='jet')axes[1].set_title(f'连续小波变换时频图 ({wavelet})', fontsize=14, fontweight='bold')axes[1].set_xlabel('时间 (s)')axes[1].set_ylabel('频率 (Hz)')axes[1].set_yscale('log')# 添加颜色条cbar = plt.colorbar(im, ax=axes[1])cbar.set_label('功率', rotation=270, labelpad=20)plt.tight_layout()plt.savefig('cwt_result.png', dpi=300, bbox_inches='tight')plt.show()return coefficients, frequencies# 对含噪信号进行连续小波变换
coeffs, freqs = plot_cwt(sig_noisy, fs=1000, wavelet='cmor1.5-1.0')
2.4 离散小波变换实现
def dwt_decomposition(signal, wavelet='db4', level=5):"""执行离散小波分解参数:signal: 输入信号wavelet: 小波类型level: 分解层数"""# 执行多层小波分解coeffs = pywt.wavedec(signal, wavelet, level=level)# coeffs包含: [cA_n, cD_n, cD_n-1, ..., cD_1]# cA_n: 第n层近似系数# cD_i: 第i层细节系数return coeffsdef plot_dwt_decomposition(signal, fs, wavelet='db4', level=5):"""可视化离散小波分解结果"""coeffs = dwt_decomposition(signal, wavelet, level)# 创建子图fig, axes = plt.subplots(level + 2, 1, figsize=(14, 3 * (level + 2)))# 绘制原始信号t = np.arange(len(signal)) / fsaxes[0].plot(t, signal, 'k', linewidth=1)axes[0].set_title('原始信号', fontsize=12, fontweight='bold')axes[0].set_ylabel('幅度')axes[0].grid(True)# 绘制近似系数cA = coeffs[0]t_cA = np.linspace(0, len(signal)/fs, len(cA))axes[1].plot(t_cA, cA, 'b', linewidth=1)axes[1].set_title(f'近似系数 (cA{level})', fontsize=12, fontweight='bold')axes[1].set_ylabel('幅度')axes[1].grid(True)# 绘制细节系数for i in range(1, level + 1):cD = coeffs[i]t_cD = np.linspace(0, len(signal)/fs, len(cD))axes[i + 1].plot(t_cD, cD, 'r', linewidth=1)axes[i + 1].set_title(f'细节系数 (cD{level - i + 1})', fontsize=12, fontweight='bold')axes[i + 1].set_ylabel('幅度')axes[i + 1].grid(True)axes[-1].set_xlabel('时间 (s)')plt.tight_layout()plt.savefig('dwt_decomposition.png', dpi=300, bbox_inches='tight')plt.show()return coeffs# 执行离散小波分解
coeffs = plot_dwt_decomposition(sig_noisy, fs=1000, wavelet='db4', level=5)
2.5 小波去噪实现
def wavelet_denoise(signal, wavelet='db4', level=5, threshold_type='soft'):"""使用小波变换进行信号去噪参数:signal: 输入信号wavelet: 小波类型level: 分解层数threshold_type: 阈值类型 ('soft' 或 'hard')"""# 分解coeffs = pywt.wavedec(signal, wavelet, level=level)# 计算阈值(使用通用阈值)sigma = np.median(np.abs(coeffs[-1])) / 0.6745threshold = sigma * np.sqrt(2 * np.log(len(signal)))# 对细节系数进行阈值处理coeffs_thresholded = [coeffs[0]] # 保留近似系数for i in range(1, len(coeffs)):if threshold_type == 'soft':# 软阈值coeff_thresh = pywt.threshold(coeffs[i], threshold, mode='soft')else:# 硬阈值coeff_thresh = pywt.threshold(coeffs[i], threshold, mode='hard')coeffs_thresholded.append(coeff_thresh)# 重构信号signal_denoised = pywt.waverec(coeffs_thresholded, wavelet)# 确保长度一致signal_denoised = signal_denoised[:len(signal)]return signal_denoised, threshold# 执行去噪
sig_denoised_soft, threshold = wavelet_denoise(sig_noisy, wavelet='db4', level=5, threshold_type='soft')
sig_denoised_hard, _ = wavelet_denoise(sig_noisy, wavelet='db4', level=5, threshold_type='hard')# 可视化去噪结果
fig, axes = plt.subplots(4, 1, figsize=(14, 12))axes[0].plot(t, sig_clean, 'g', linewidth=1.5, label='原始干净信号')
axes[0].set_title('原始干净信号(参考)', fontsize=12, fontweight='bold')
axes[0].set_ylabel('幅度')
axes[0].legend()
axes[0].grid(True)axes[1].plot(t, sig_noisy, 'r', linewidth=0.8, alpha=0.7, label='含噪信号')
axes[1].set_title('含噪声信号', fontsize=12, fontweight='bold')
axes[1].set_ylabel('幅度')
axes[1].legend()
axes[1].grid(True)axes[2].plot(t, sig_denoised_soft, 'b', linewidth=1.2, label='软阈值去噪')
axes[2].plot(t, sig_clean, 'g--', linewidth=1, alpha=0.5, label='参考信号')
axes[2].set_title(f'软阈值去噪结果 (阈值={threshold:.3f})', fontsize=12, fontweight='bold')
axes[2].set_ylabel('幅度')
axes[2].legend()
axes[2].grid(True)axes[3].plot(t, sig_denoised_hard, 'purple', linewidth=1.2, label='硬阈值去噪')
axes[3].plot(t, sig_clean, 'g--', linewidth=1, alpha=0.5, label='参考信号')
axes[3].set_title('硬阈值去噪结果', fontsize=12, fontweight='bold')
axes[3].set_xlabel('时间 (s)')
axes[3].set_ylabel('幅度')
axes[3].legend()
axes[3].grid(True)plt.tight_layout()
plt.savefig('wavelet_denoise.png', dpi=300, bbox_inches='tight')
plt.show()# 计算信噪比
def calculate_snr(clean, noisy):"""计算信噪比"""signal_power = np.sum(clean ** 2)noise_power = np.sum((clean - noisy) ** 2)snr = 10 * np.log10(signal_power / noise_power)return snrsnr_original = calculate_snr(sig_clean, sig_noisy)
snr_soft = calculate_snr(sig_clean, sig_denoised_soft)
snr_hard = calculate_snr(sig_clean, sig_denoised_hard)print(f"原始信号SNR: {snr_original:.2f} dB")
print(f"软阈值去噪后SNR: {snr_soft:.2f} dB (提升 {snr_soft - snr_original:.2f} dB)")
print(f"硬阈值去噪后SNR: {snr_hard:.2f} dB (提升 {snr_hard - snr_original:.2f} dB)")
2.6 小波包变换
def wavelet_packet_analysis(signal, wavelet='db4', level=4):"""小波包分析"""# 创建小波包对象wp = pywt.WaveletPacket(signal, wavelet, maxlevel=level)# 获取所有节点nodes = [node.path for node in wp.get_level(level, 'freq')]# 提取能量energies = []labels = []for node_path in nodes:node = wp[node_path]energy = np.sum(node.data ** 2)energies.append(energy)labels.append(node_path)# 归一化能量total_energy = sum(energies)energies_normalized = [e / total_energy * 100 for e in energies]# 可视化fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(14, 10))# 能量分布柱状图colors = plt.cm.viridis(np.linspace(0, 1, len(energies)))ax1.bar(range(len(energies)), energies_normalized, color=colors)ax1.set_xlabel('频带编号')ax1.set_ylabel('能量百分比 (%)')ax1.set_title(f'小波包能量分布 (Level {level})', fontsize=14, fontweight='bold')ax1.grid(True, alpha=0.3)# 小波包系数热图coeffs_matrix = []for node_path in nodes:node = wp[node_path]coeffs_matrix.append(node.data[:min(200, len(node.data))]) # 只显示前200个点im = ax2.imshow(coeffs_matrix, aspect='auto', cmap='jet', interpolation='nearest')ax2.set_xlabel('时间采样点')ax2.set_ylabel('频带编号')ax2.set_title('小波包系数热图', fontsize=14, fontweight='bold')plt.colorbar(im, ax=ax2, label='系数幅度')plt.tight_layout()plt.savefig('wavelet_packet.png', dpi=300, bbox_inches='tight')plt.show()return wp, energies_normalized# 执行小波包分析
wp, energies = wavelet_packet_analysis(sig_noisy, wavelet='db4', level=4)
2.7 不同小波基的比较
def compare_wavelets(signal, wavelets=['haar', 'db4', 'sym5', 'coif3'], level=4):"""比较不同小波基的去噪效果"""fig, axes = plt.subplots(len(wavelets) + 1, 1, figsize=(14, 4 * (len(wavelets) + 1)))# 原始信号t = np.arange(len(signal)) / 1000axes[0].plot(t, signal, 'k', linewidth=1)axes[0].set_title('原始含噪信号', fontsize=12, fontweight='bold')axes[0].set_ylabel('幅度')axes[0].grid(True)results = {}# 对每个小波进行去噪for idx, wavelet_name in enumerate(wavelets):denoised, threshold = wavelet_denoise(signal, wavelet=wavelet_name, level=level)snr = calculate_snr(sig_clean, denoised)results[wavelet_name] = {'signal': denoised, 'snr': snr, 'threshold': threshold}axes[idx + 1].plot(t, denoised, linewidth=1.2, label=f'{wavelet_name} (SNR={snr:.2f}dB)')axes[idx + 1].plot(t, sig_clean, 'g--', linewidth=0.8, alpha=0.4, label='参考')axes[idx + 1].set_title(f'{wavelet_name}小波去噪 (阈值={threshold:.3f})', fontsize=12, fontweight='bold')axes[idx + 1].set_ylabel('幅度')axes[idx + 1].legend(loc='upper right')axes[idx + 1].grid(True)axes[-1].set_xlabel('时间 (s)')plt.tight_layout()plt.savefig('wavelet_comparison.png', dpi=300, bbox_inches='tight')plt.show()# 打印性能比较print("\n" + "="*60)print("不同小波基去噪性能比较")print("="*60)for wavelet_name, result in results.items():print(f"{wavelet_name:10s}: SNR = {result['snr']:6.2f} dB, 阈值 = {result['threshold']:.4f}")print("="*60)return results# 比较不同小波基
wavelet_results = compare_wavelets(sig_noisy, wavelets=['haar', 'db4', 'db8', 'sym5', 'coif3'])
🔬 第三部分:MATLAB实现
%% 小波变换MATLAB完整实现
% 作者: [你的名字]
% 日期: 2024clear all; close all; clc;%% 1. 生成测试信号
fs = 1000; % 采样频率
duration = 1; % 信号时长
t = 0:1/fs:duration-1/fs;% 生成复合信号
comp1 = sin(2*pi*10*t); % 10Hz分量
comp2 = zeros(size(t));
comp2(t>=0.3 & t<=0.6) = sin(2*pi*50*t(t>=0.3 & t<=0.6)); % 50Hz分量
comp3 = zeros(size(t));
comp3(t>=0.7 & t<=0.9) = sin(2*pi*100*t(t>=0.7 & t<=0.9)); % 100Hz分量signal_clean = comp1 + comp2 + comp3;
signal_noisy = signal_clean + 0.2*randn(size(t));%% 2. 信号可视化
figure('Position', [100, 100, 1200, 800]);subplot(3,1,1);
plot(t, comp1, 'LineWidth', 1.5); hold on;
plot(t, comp2, 'LineWidth', 1.5);
plot(t, comp3, 'LineWidth', 1.5);
title('信号各个成分', 'FontSize', 14, 'FontWeight', 'bold');
xlabel('时间 (s)'); ylabel('幅度');
legend('10Hz', '50Hz', '100Hz');
grid on;subplot(3,1,2);
plot(t, signal_clean, 'b', 'LineWidth', 1.5);
title('干净的复合信号', 'FontSize', 14, 'FontWeight', 'bold');
xlabel('时间 (s)'); ylabel('幅度');
grid on;subplot(3,1,3);
plot(t, signal_noisy, 'r', 'LineWidth', 1);
title('含噪声的复合信号', 'FontSize', 14, 'FontWeight', 'bold');
xlabel('时间 (s)'); ylabel('幅度');
grid on;%% 3. 连续小波变换 (CWT)
figure('Position', [100, 100, 1200, 600]);wavelet_name = 'cmor1.5-1.0'; % Complex Morlet小波
scales = 1:128;
[coeffs, frequencies] = cwt(signal_noisy, scales, wavelet_name, 1/fs);subplot(2,1,1);
plot(t, signal_noisy, 'k', 'LineWidth', 1);
title('原始信号', 'FontSize', 14, 'FontWeight', 'bold');
xlabel('时间 (s)'); ylabel('幅度');
grid on;subplot(2,1,2);
imagesc(t, frequencies, abs(coeffs).^2);
axis xy;
set(gca, 'YScale', 'log');
colormap jet;
colorbar;
title(['连续小波变换时频图 (', wavelet_name, ')'], 'FontSize', 14, 'FontWeight', 'bold');
xlabel('时间 (s)'); ylabel('频率 (Hz)');%% 4. 离散小波变换 (DWT)
wavelet_name = 'db4';
level = 5;
[C, L] = wavedec(signal_noisy, level, wavelet_name);% 提取各层系数
cA5 = appcoef(C, L, wavelet_name, 5);
cD5 = detcoef(C, L, 5);
cD4 = detcoef(C, L, 4);
cD3 = detcoef(C, L, 3);
cD2 = detcoef(C, L, 2);
cD1 = detcoef(C, L, 1);% 可视化分解结果
figure('Position', [100, 100, 1200, 1000]);subplot(7,1,1);
plot(t, signal_noisy, 'k'); title('原始信号');
ylabel('幅度'); grid on;subplot(7,1,2);
plot(linspace(0, duration, length(cA5)), cA5, 'b');
title('近似系数 cA5'); ylabel('幅度'); grid on;subplot(7,1,3);
plot(linspace(0, duration, length(cD5)), cD5, 'r');
title('细节系数 cD5'); ylabel('幅度'); grid on;subplot(7,1,4);
plot(linspace(0, duration, length(cD4)), cD4, 'r');
title('细节系数 cD4'); ylabel('幅度'); grid on;subplot(7,1,5);
plot(linspace(0, duration, length(cD3)), cD3, 'r');
title('细节系数 cD3'); ylabel('幅度'); grid on;subplot(7,1,6);
plot(linspace(0, duration, length(cD2)), cD2, 'r');
title('细节系数 cD2'); ylabel('幅度'); grid on;subplot(7,1,7);
plot(linspace(0, duration, length(cD1)), cD1, 'r');
title('细节系数 cD1'); xlabel('时间 (s)'); ylabel('幅度'); grid on;%% 5. 小波去噪
% 方法1: 使用wden函数
signal_denoised1 = wden(signal_noisy, 'rigrsure', 's', 'sln', level, wavelet_name);% 方法2: 手动阈值处理
[C, L] = wavedec(signal_noisy, level, wavelet_name);
sigma = median(abs(cD1)) / 0.6745;
threshold = sigma * sqrt(2*log(length(signal_noisy)));% 软阈值
C_thresh = wthresh(C, 's', threshold);
signal_denoised2 = waverec(C_thresh, L, wavelet_name);% 硬阈值
C_thresh_hard = wthresh(C, 'h', threshold);
signal_denoised3 = waverec(C_thresh_hard, L, wavelet_name);% 可视化去噪结果
figure('Position', [100, 100, 1200, 1000]);subplot(4,1,1);
plot(t, signal_clean, 'g', 'LineWidth', 1.5);
title('原始干净信号(参考)', 'FontSize', 12, 'FontWeight', 'bold');
ylabel('幅度'); grid on;subplot(4,1,2);
plot(t, signal_noisy, 'r', 'LineWidth', 0.8);
title('含噪声信号', 'FontSize', 12, 'FontWeight', 'bold');
ylabel('幅度'); grid on;subplot(4,1,3);
plot(t, signal_denoised2, 'b', 'LineWidth', 1.2); hold on;
plot(t, signal_clean, 'g--', 'LineWidth', 1, 'Color', [0 1 0 0.3]);
title(['软阈值去噪 (阈值=', num2str(threshold, '%.3f'), ')'], ...'FontSize', 12, 'FontWeight', 'bold');
ylabel('幅度'); legend('去噪信号', '参考信号'); grid on;subplot(4,1,4);
plot(t, signal_denoised3, 'm', 'LineWidth', 1.2); hold on;
plot(t, signal_clean, 'g--', 'LineWidth', 1, 'Color', [0 1 0 0.3]);
title('硬阈值去噪', 'FontSize', 12, 'FontWeight', 'bold');
xlabel('时间 (s)'); ylabel('幅度');
legend('去噪信号', '参考信号'); grid on;% 计算SNR
snr_original = 10*log10(sum(signal_clean.^2) / sum((signal_clean - signal_noisy).^2));
snr_soft = 10*log10(sum(signal_clean.^2) / sum((signal_clean - signal_denoised2).^2));
snr_hard = 10*log10(sum(signal_clean.^2) / sum((signal_clean - signal_denoised3).^2));fprintf('\n========================================\n');
fprintf('去噪性能评估\n');
fprintf('========================================\n');
fprintf('原始信号SNR: %.2f dB\n', snr_original);
fprintf('软阈值去噪后SNR: %.2f dB (提升 %.2f dB)\n', snr_soft, snr_soft - snr_original);
fprintf('硬阈值去噪后SNR: %.2f dB (提升 %.2f dB)\n', snr_hard, snr_hard - snr_original);
fprintf('========================================\n');%% 6. 小波包分析
figure('Position', [100, 100, 1200, 600]);wpt = wpdec(signal_noisy, level, wavelet_name);
[E, EE] = wenergy(wpt);subplot(2,1,1);
bar(E*100);
title(['小波包能量分布 (Level ', num2str(level), ')'], ...'FontSize', 14, 'FontWeight', 'bold');
xlabel('频带编号'); ylabel('能量百分比 (%)');
grid on;subplot(2,1,2);
plot(wprcoef(wpt, [level, 0])); hold on;
plot(wprcoef(wpt, [level, 1]));
plot(wprcoef(wpt, [level, 2]));
title('前三个频带的重构信号', 'FontSize', 14, 'FontWeight', 'bold');
xlabel('采样点'); ylabel('幅度');
legend('频带0', '频带1', '频带2');
grid on;%% 7. 小波族比较
wavelets = {'haar', 'db4', 'db8', 'sym5', 'coif3'};
figure('Position', [100, 100, 1400, 1000]);for i = 1:length(wavelets)wname = wavelets{i};[C, L] = wavedec(signal_noisy, level, wname);% 计算阈值cD_temp = detcoef(C, L, 1);sigma = median(abs(cD_temp)) / 0.6745;thr = sigma * sqrt(2*log(length(signal_noisy)));% 去噪C_thresh = wthresh(C, 's', thr);denoised = waverec(C_thresh, L, wname);% 计算SNRsnr = 10*log10(sum(signal_clean.^2) / sum((signal_clean - denoised).^2));% 绘图subplot(length(wavelets), 1, i);plot(t, denoised, 'LineWidth', 1.2); hold on;plot(t, signal_clean, 'g--', 'LineWidth', 0.8, 'Color', [0 1 0 0.3]);title([wname, ' 小波去噪 (SNR=', num2str(snr, '%.2f'), 'dB, 阈值=', ...num2str(thr, '%.3f'), ')'], 'FontSize', 11, 'FontWeight', 'bold');ylabel('幅度');legend('去噪信号', '参考信号', 'Location', 'northeast');grid on;
end
xlabel('时间 (s)');fprintf('\n程序执行完毕!\n');
🎯 第四部分:实际应用案例
4.1 心电信号(ECG)去噪
def ecg_wavelet_denoise(ecg_signal, fs, wavelet='db6', level=8):"""心电信号小波去噪ECG信号特点:- 主要频率: 0.5-40 Hz- QRS波群: 10-25 Hz- 基线漂移: < 0.5 Hz- 肌电干扰: > 40 Hz"""# 多层小波分解coeffs = pywt.wavedec(ecg_signal, wavelet, level=level)# 去除低频基线漂移 (cA)coeffs[0] = np.zeros_like(coeffs[0])# 对高频细节进行自适应阈值处理for i in range(1, len(coeffs)):# 计算每层的自适应阈值sigma = np.median(np.abs(coeffs[i])) / 0.6745threshold = sigma * np.sqrt(2 * np.log(len(coeffs[i])))# 软阈值coeffs[i] = pywt.threshold(coeffs[i], threshold, mode='soft')# 重构信号ecg_denoised = pywt.waverec(coeffs, wavelet)ecg_denoised = ecg_denoised[:len(ecg_signal)]return ecg_denoised# 生成模拟ECG信号
def generate_ecg_like_signal(duration=5, fs=500):"""生成类ECG信号"""t = np.linspace(0, duration, int(fs * duration))# 基础心跳信号 (简化的QRS波群)heart_rate = 75 # 75 bpmbeat_interval = 60 / heart_rateecg = np.zeros_like(t)for beat_time in np.arange(0, duration, beat_interval):# QRS波群 (简化为高斯波)qrs = signal.gaussian(int(0.1 * fs), std=int(0.02 * fs))start_idx = int(beat_time * fs)end_idx = min(start_idx + len(qrs), len(ecg))ecg[start_idx:end_idx] += qrs[:end_idx - start_idx]# 添加基线漂移baseline_wander = 0.3 * np.sin(2 * np.pi * 0.2 * t)# 添加高频肌电干扰emg_noise = 0.1 * np.random.randn(len(t))# 添加工频干扰 (50Hz)powerline_noise = 0.15 * np.sin(2 * np.pi * 50 * t)ecg_noisy = ecg + baseline_wander + emg_noise + powerline_noisereturn t, ecg, ecg_noisy# 生成并去噪ECG信号
t_ecg, ecg_clean, ecg_noisy = generate_ecg_like_signal()
ecg_denoised = ecg_wavelet_denoise(ecg_noisy, fs=500)# 可视化
fig, axes = plt.subplots(3, 1, figsize=(14, 10))axes[0].plot(t_ecg, ecg_clean, 'g', linewidth=1.5)
axes[0].set_title('理想ECG信号', fontsize=14, fontweight='bold')
axes[0].set_ylabel('幅度 (mV)')
axes[0].grid(True)axes[1].plot(t_ecg, ecg_noisy, 'r', linewidth=1, alpha=0.7)
axes[1].set_title('含噪ECG信号 (基线漂移 + 肌电干扰 + 工频干扰)', fontsize=14, fontweight='bold')
axes[1].set_ylabel('幅度 (mV)')
axes[1].grid(True)axes[2].plot(t_ecg, ecg_denoised, 'b', linewidth=1.2)
axes[2].plot(t_ecg, ecg_clean, 'g--', linewidth=1, alpha=0.4)
axes[2].set_title('小波去噪后的ECG信号', fontsize=14, fontweight='bold')
axes[2].set_xlabel('时间 (s)')
axes[2].set_ylabel('幅度 (mV)')
axes[2].legend(['去噪信号', '参考信号'])
axes[2].grid(True)plt.tight_layout()
plt.savefig('ecg_wavelet_denoise.png', dpi=300, bbox_inches='tight')
plt.show()
4.2 加速度传感器信号处理
def generate_accelerometer_signal(duration=10, fs=1000):"""生成模拟加速度传感器信号包含步态信号和振动干扰"""t = np.linspace(0, duration, int(fs * duration))# 步态信号 (约2Hz)step_freq = 2.0gait_signal = 0.5 * np.sin(2 * np.pi * step_freq * t) + \0.2 * np.sin(2 * np.pi * 2 * step_freq * t)# 添加突发振动 (模拟碰撞)impact_times = [2.5, 5.0, 7.5]impact_signal = np.zeros_like(t)for impact_t in impact_times:impact_idx = int(impact_t * fs)impact_duration = int(0.1 * fs)if impact_idx + impact_duration < len(t):impact_signal[impact_idx:impact_idx + impact_duration] = \3.0 * np.exp(-10 * np.linspace(0, 0.1, impact_duration)) * \np.sin(2 * np.pi * 50 * np.linspace(0, 0.1, impact_duration))# 高频振动噪声vibration_noise = 0.15 * np.random.randn(len(t))# 合成信号acc_signal = gait_signal + impact_signal + vibration_noisereturn t, gait_signal, acc_signaldef detect_impact_events(signal, fs, wavelet='db4', level=6):"""使用小波变换检测冲击事件"""# 小波分解coeffs = pywt.wavedec(signal, wavelet, level=level)# 使用中频细节系数检测冲击 (cD3-cD4)impact_coeff = coeffs[3] + coeffs[4]# 阈值检测threshold = 3 * np.std(impact_coeff)impact_indices = np.where(np.abs(impact_coeff) > threshold)[0]# 转换为原始信号的索引scale_factor = len(signal) // len(impact_coeff)impact_times = impact_indices * scale_factor / fsreturn impact_times, impact_coeff# 生成信号
t_acc, gait_clean, acc_noisy = generate_accelerometer_signal()# 去噪
acc_denoised, _ = wavelet_denoise(acc_noisy, wavelet='db4', level=6)# 检测冲击事件
impact_times, impact_coeff = detect_impact_events(acc_noisy, fs=1000)# 可视化
fig, axes = plt.subplots(4, 1, figsize=(14, 12))axes[0].plot(t_acc, gait_clean, 'g', linewidth=1.5)
axes[0].set_title('理想步态信号', fontsize=12, fontweight='bold')
axes[0].set_ylabel('加速度 (m/s²)')
axes[0].grid(True)axes[1].plot(t_acc, acc_noisy, 'r', linewidth=1, alpha=0.7)
axes[1].set_title('含噪加速度信号 (步态 + 冲击 + 振动)', fontsize=12, fontweight='bold')
axes[1].set_ylabel('加速度 (m/s²)')
axes[1].grid(True)axes[2].plot(t_acc, acc_denoised, 'b', linewidth=1.2)
axes[2].set_title('小波去噪后的加速度信号', fontsize=12, fontweight='bold')
axes[2].set_ylabel('加速度 (m/s²)')
axes[2].grid(True)# 显示检测到的冲击事件
axes[3].plot(t_acc, acc_noisy, 'gray', linewidth=0.8, alpha=0.5, label='原始信号')
for impact_t in impact_times:axes[3].axvline(x=impact_t, color='r', linestyle='--', linewidth=2, alpha=0.7)
axes[3].set_title(f'冲击事件检测 (检测到 {len(impact_times)} 个事件)', fontsize=12, fontweight='bold')
axes[3].set_xlabel('时间 (s)')
axes[3].set_ylabel('加速度 (m/s²)')
axes[3].legend(['原始信号', '检测到的冲击'])
axes[3].grid(True)plt.tight_layout()
plt.savefig('accelerometer_analysis.png', dpi=300, bbox_inches='tight')
plt.show()print(f"\n检测到的冲击事件时刻: {impact_times} 秒")
4.3 图像去噪应用
def wavelet_image_denoise(image, wavelet='db1', level=2, threshold_factor=2.0):"""使用小波变换进行图像去噪"""# 2D小波分解coeffs = pywt.wavedec2(image, wavelet, level=level)# coeffs格式: [cAn, (cHn, cVn, cDn), ..., (cH1, cV1, cD1)]# cA: 近似系数, cH: 水平细节, cV: 垂直细节, cD: 对角细节# 估计噪声标准差sigma = np.median(np.abs(coeffs[-1][2])) / 0.6745threshold = threshold_factor * sigma * np.sqrt(2 * np.log(image.size))# 对细节系数进行阈值处理coeffs_thresholded = [coeffs[0]] # 保留近似系数for i in range(1, len(coeffs)):cH, cV, cD = coeffs[i]cH_thresh = pywt.threshold(cH, threshold, mode='soft')cV_thresh = pywt.threshold(cV, threshold, mode='soft')cD_thresh = pywt.threshold(cD, threshold, mode='soft')coeffs_thresholded.append((cH_thresh, cV_thresh, cD_thresh))# 重构图像image_denoised = pywt.waverec2(coeffs_thresholded, wavelet)# 确保尺寸一致image_denoised = image_denoised[:image.shape[0], :image.shape[1]]return image_denoised# 生成测试图像
def generate_test_image(size=256):"""生成测试图像(圆形和方形)"""image = np.zeros((size, size))# 添加圆形center1 = (size // 3, size // 3)radius1 = size // 6y, x = np.ogrid[:size, :size]mask1 = (x - center1[0])**2 + (y - center1[1])**2 <= radius1**2image[mask1] = 1.0# 添加方形start = 2 * size // 3 - size // 8end = 2 * size // 3 + size // 8image[start:end, start:end] = 0.8# 添加噪声noise = 0.2 * np.random.randn(size, size)image_noisy = image + noiseimage_noisy = np.clip(image_noisy, 0, 1)return image, image_noisy# 生成并处理图像
image_clean, image_noisy = generate_test_image(size=256)
image_denoised = wavelet_image_denoise(image_noisy, wavelet='db1', level=3)# 可视化
fig, axes = plt.subplots(2, 3, figsize=(15, 10))# 第一行:图像
axes[0, 0].imshow(image_clean, cmap='gray', vmin=0, vmax=1)
axes[0, 0].set_title('原始图像', fontsize=12, fontweight='bold')
axes[0, 0].axis('off')axes[0, 1].imshow(image_noisy, cmap='gray', vmin=0, vmax=1)
axes[0, 1].set_title('含噪图像', fontsize=12, fontweight='bold')
axes[0, 1].axis('off')axes[0, 2].imshow(image_denoised, cmap='gray', vmin=0, vmax=1)
axes[0, 2].set_title('小波去噪后', fontsize=12, fontweight='bold')
axes[0, 2].axis('off')# 第二行:差异图
axes[1, 0].imshow(image_clean, cmap='gray', vmin=0, vmax=1)
axes[1, 0].set_title('参考', fontsize=12, fontweight='bold')
axes[1, 0].axis('off')noise_map = np.abs(image_noisy - image_clean)
axes[1, 1].imshow(noise_map, cmap='hot')
axes[1, 1].set_title(f'噪声分布 (PSNR={calculate_psnr(image_clean, image_noisy):.2f}dB)', fontsize=12, fontweight='bold')
axes[1, 1].axis('off')error_map = np.abs(image_denoised - image_clean)
axes[1, 2].imshow(error_map, cmap='hot')
axes[1, 2].set_title(f'去噪误差 (PSNR={calculate_psnr(image_clean, image_denoised):.2f}dB)', fontsize=12, fontweight='bold')
axes[1, 2].axis('off')plt.tight_layout()
plt.savefig('image_wavelet_denoise.png', dpi=300, bbox_inches='tight')
plt.show()def calculate_psnr(img1, img2):"""计算峰值信噪比"""mse = np.mean((img1 - img2) ** 2)if mse == 0:return float('inf')max_pixel = 1.0psnr = 20 * np.log10(max_pixel / np.sqrt(mse))return psnr
🔧 第五部分:传感器实际验证
5.1 读取真实传感器数据
def read_sensor_data(filename):"""读取传感器数据文件支持CSV格式"""try:data = np.loadtxt(filename, delimiter=',', skiprows=1)return dataexcept:print(f"无法读取文件: {filename}")return Nonedef analyze_sensor_data(data, fs, sensor_type='accelerometer'):"""分析真实传感器数据参数:data: 传感器数据数组fs: 采样频率sensor_type: 传感器类型"""# 时间轴t = np.arange(len(data)) / fs# 小波去噪data_denoised, threshold = wavelet_denoise(data, wavelet='db6', level=6)# 小波分解coeffs = pywt.wavedec(data, 'db6', level=6)# 连续小波变换scales = np.arange(1, 128)cwt_coeffs, freqs = pywt.cwt(data_denoised, scales, 'cmor1.5-1.0', 1/fs)# 可视化fig = plt.figure(figsize=(16, 12))gs = fig.add_gridspec(3, 2, hspace=0.3, wspace=0.3)# 原始信号ax1 = fig.add_subplot(gs[0, :])ax1.plot(t, data, 'b', linewidth=0.8, alpha=0.7, label='原始信号')ax1.plot(t, data_denoised, 'r', linewidth=1.2, label='去噪信号')ax1.set_title(f'{sensor_type.upper()}传感器信号分析', fontsize=14, fontweight='bold')ax1.set_xlabel('时间 (s)')ax1.set_ylabel('幅度')ax1.legend()ax1.grid(True)# 时频图ax2 = fig.add_subplot(gs[1, :])im = ax2.contourf(t, freqs, np.abs(cwt_coeffs)**2, levels=100, cmap='jet')ax2.set_title('连续小波变换时频分析', fontsize=14, fontweight='bold')ax2.set_xlabel('时间 (s)')ax2.set_ylabel('频率 (Hz)')ax2.set_yscale('log')plt.colorbar(im, ax=ax2, label='能量')# 频谱对比ax3 = fig.add_subplot(gs[2, 0])f_original, psd_original = signal.welch(data, fs, nperseg=1024)f_denoised, psd_denoised = signal.welch(data_denoised, fs, nperseg=1024)ax3.semilogy(f_original, psd_original, 'b', alpha=0.7, label='原始信号')ax3.semilogy(f_denoised, psd_denoised, 'r', linewidth=1.5, label='去噪信号')ax3.set_title('功率谱密度', fontsize=12, fontweight='bold')ax3.set_xlabel('频率 (Hz)')ax3.set_ylabel('PSD (dB/Hz)')ax3.legend()ax3.grid(True)# 小波系数能量分布ax4 = fig.add_subplot(gs[2, 1])energies = [np.sum(np.abs(c)**2) for c in coeffs]total_energy = sum(energies)energy_percent = [e/total_energy*100 for e in energies]levels = [f'cA{len(coeffs)-1}'] + [f'cD{len(coeffs)-i}' for i in range(1, len(coeffs))]ax4.bar(levels, energy_percent, color='steelblue')ax4.set_title('小波系数能量分布', fontsize=12, fontweight='bold')ax4.set_xlabel('分解层级')ax4.set_ylabel('能量占比 (%)')ax4.grid(True, alpha=0.3)plt.savefig(f'{sensor_type}_analysis.png', dpi=300, bbox_inches='tight')plt.show()# 输出统计信息print("\n" + "="*60)print(f"{sensor_type.upper()}传感器数据分析结果")print("="*60)print(f"数据点数: {len(data)}")print(f"采样频率: {fs} Hz")print(f"信号时长: {len(data)/fs:.2f} 秒")print(f"原始信号 - 均值: {np.mean(data):.4f}, 标准差: {np.std(data):.4f}")print(f"去噪信号 - 均值: {np.mean(data_denoised):.4f}, 标准差: {np.std(data_denoised):.4f}")print(f"去噪阈值: {threshold:.4f}")print(f"信噪比提升: {calculate_snr(data_denoised, data - data_denoised):.2f} dB")print("="*60)return data_denoised, coeffs# 示例:生成模拟传感器数据并分析
# 在实际应用中,替换为真实传感器数据文件
simulated_sensor_data, _ = generate_test_signal(duration=5, fs=1000)
_, _, sensor_data, _, _, _ = generate_test_signal(duration=5, fs=1000)# 分析传感器数据
denoised_data, coeffs = analyze_sensor_data(sensor_data, fs=1000, sensor_type='accelerometer')
5.2 实时传感器数据处理
class RealtimeWaveletProcessor:"""实时小波变换处理器适用于流式传感器数据"""def __init__(self, wavelet='db4', level=4, buffer_size=1024):self.wavelet = waveletself.level = levelself.buffer_size = buffer_sizeself.buffer = np.array([])self.processed_count = 0def process_chunk(self, new_data):"""处理新的数据块"""# 将新数据添加到缓冲区self.buffer = np.concatenate([self.buffer, new_data])# 如果缓冲区足够大,进行处理if len(self.buffer) >= self.buffer_size:# 取出处理数据data_to_process = self.buffer[:self.buffer_size]# 小波去噪denoised, _ = wavelet_denoise(data_to_process, wavelet=self.wavelet, level=self.level)# 更新缓冲区(保留50%重叠)overlap = self.buffer_size // 2self.buffer = self.buffer[overlap:]self.processed_count += len(denoised) - overlapreturn denoised[:-overlap]return np.array([])def reset(self):"""重置处理器"""self.buffer = np.array([])self.processed_count = 0# 模拟实时数据流
def simulate_realtime_processing(duration=10, fs=1000, chunk_size=100):"""模拟实时传感器数据处理"""# 生成完整信号_, _, full_signal, _, _, _ = generate_test_signal(duration=duration, fs=fs)# 创建处理器processor = RealtimeWaveletProcessor(wavelet='db4', level=4, buffer_size=512)# 存储结果processed_signal = []# 模拟实时处理num_chunks = len(full_signal) // chunk_sizeprint("\n开始实时数据处理...")print(f"总数据点: {len(full_signal)}, 块大小: {chunk_size}, 块数量: {num_chunks}")for i in range(num_chunks):# 获取新数据块start_idx = i * chunk_sizeend_idx = start_idx + chunk_sizenew_chunk = full_signal[start_idx:end_idx]# 处理result = processor.process_chunk(new_chunk)# 存储结果if len(result) > 0:processed_signal.extend(result)# 显示进度if (i + 1) % 10 == 0:print(f"处理进度: {(i+1)/num_chunks*100:.1f}%")print("实时处理完成!")return np.array(processed_signal), full_signal# 执行实时处理模拟
processed_rt, original_rt = simulate_realtime_processing(duration=5, fs=1000, chunk_size=100)# 可视化结果
fig, axes = plt.subplots(2, 1, figsize=(14, 8))t_original = np.arange(len(original_rt)) / 1000
t_processed = np.arange(len(processed_rt)) / 1000axes[0].plot(t_original, original_rt, 'b', linewidth=0.8, alpha=0.7)
axes[0].set_title('原始实时数据流', fontsize=14, fontweight='bold')
axes[0].set_ylabel('幅度')
axes[0].grid(True)axes[1].plot(t_processed, processed_rt, 'r', linewidth=1.2)
axes[1].set_title('实时小波去噪结果', fontsize=14, fontweight='bold')
axes[1].set_xlabel('时间 (s)')
axes[1].set_ylabel('幅度')
axes[1].grid(True)plt.tight_layout()
plt.savefig('realtime_processing.png', dpi=300, bbox_inches='tight')
plt.show()
📊 第六部分:性能评估与对比
6.1 不同去噪方法对比
def compare_denoising_methods(signal_clean, signal_noisy, fs):"""对比不同去噪方法的性能"""# 方法1: 小波去噪denoised_wavelet, _ = wavelet_denoise(signal_noisy, wavelet='db6', level=5)# 方法2: 低通滤波from scipy.signal import butter, filtfiltnyquist = fs / 2cutoff = 40 # Hzb, a = butter(4, cutoff / nyquist, btype='low')denoised_lowpass = filtfilt(b, a, signal_noisy)# 方法3: 中值滤波from scipy.signal import medfiltdenoised_median = medfilt(signal_noisy, kernel_size=5)# 方法4: 移动平均window_size = 10denoised_moving_avg = np.convolve(signal_noisy, np.ones(window_size)/window_size, mode='same')# 计算性能指标methods = {'小波去噪': denoised_wavelet,'低通滤波': denoised_lowpass,'中值滤波': denoised_median,'移动平均': denoised_moving_avg}results = {}for name, denoised in methods.items():snr = calculate_snr(signal_clean, denoised)mse = np.mean((signal_clean - denoised) ** 2)corr = np.corrcoef(signal_clean, denoised)[0, 1]results[name] = {'SNR': snr, 'MSE': mse, 'Correlation': corr}# 可视化对比fig, axes = plt.subplots(len(methods) + 1, 1, figsize=(14, 3 * (len(methods) + 1)))t = np.arange(len(signal_clean)) / fs# 原始信号axes[0].plot(t, signal_clean, 'g', linewidth=1.5, label='干净信号')axes[0].plot(t, signal_noisy, 'r', linewidth=0.8, alpha=0.5, label='含噪信号')axes[0].set_title('原始信号对比', fontsize=12, fontweight='bold')axes[0].legend()axes[0].grid(True)# 各种方法的结果for idx, (name, denoised) in enumerate(methods.items()):axes[idx + 1].plot(t, denoised, linewidth=1.2, label=f'{name}结果')axes[idx + 1].plot(t, signal_clean, 'g--', linewidth=0.8, alpha=0.3, label='参考')snr = results[name]['SNR']mse = results[name]['MSE']corr = results[name]['Correlation']axes[idx + 1].set_title(f'{name} | SNR: {snr:.2f}dB | MSE: {mse:.4f} | 相关系数: {corr:.4f}',fontsize=11, fontweight='bold')axes[idx + 1].legend()axes[idx + 1].grid(True)axes[-1].set_xlabel('时间 (s)')plt.tight_layout()plt.savefig('denoising_methods_comparison.png', dpi=300, bbox_inches='tight')plt.show()# 打印性能表格print("\n" + "="*80)print("不同去噪方法性能对比")print("="*80)print(f"{'方法':<15} {'SNR (dB)':<15} {'MSE':<15} {'相关系数':<15}")print("-"*80)for name, metrics in results.items():print(f"{name:<15} {metrics['SNR']:<15.2f} {metrics['MSE']:<15.6f} {metrics['Correlation']:<15.4f}")print("="*80)return results# 执行对比
comparison_results = compare_denoising_methods(sig_clean, sig_noisy, fs=1000)
6.2 计算复杂度分析
import timedef benchmark_wavelet_methods(signal, wavelets=['haar', 'db4', 'db8', 'sym5'], levels=[3, 5, 7], iterations=100):"""测试不同小波和分解层数的计算时间"""results = {}print("\n" + "="*60)print("小波变换计算复杂度测试")print(f"信号长度: {len(signal)}, 迭代次数: {iterations}")print("="*60)for wavelet in wavelets:for level in levels:key = f"{wavelet}_L{level}"# 测试分解时间start_time = time.time()for _ in range(iterations):coeffs = pywt.wavedec(signal, wavelet, level=level)decomp_time = (time.time() - start_time) / iterations * 1000 # ms# 测试重构时间start_time = time.time()for _ in range(iterations):reconstructed = pywt.waverec(coeffs, wavelet)recon_time = (time.time() - start_time) / iterations * 1000 # msresults[key] = {'decomposition': decomp_time,'reconstruction': recon_time,'total': decomp_time + recon_time}print(f"{key:<15} 分解: {decomp_time:6.3f}ms, 重构: {recon_time:6.3f}ms, 总计: {decomp_time + recon_time:6.3f}ms")print("="*60)# 可视化fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))# 按小波类型分组for wavelet in wavelets:decomp_times = []recon_times = []for level in levels:key = f"{wavelet}_L{level}"decomp_times.append(results[key]['decomposition'])recon_times.append(results[key]['reconstruction'])ax1.plot(levels, decomp_times, marker='o', label=f'{wavelet} 分解', linewidth=2)ax2.plot(levels, recon_times, marker='s', label=f'{wavelet} 重构', linewidth=2)ax1.set_xlabel('分解层数')ax1.set_ylabel('时间 (ms)')ax1.set_title('小波分解时间', fontsize=14, fontweight='bold')ax1.legend()ax1.grid(True)ax2.set_xlabel('分解层数')ax2.set_ylabel('时间 (ms)')ax2.set_title('小波重构时间', fontsize=14, fontweight='bold')ax2.legend()ax2.grid(True)plt.tight_layout()plt.savefig('wavelet_performance_benchmark.png', dpi=300, bbox_inches='tight')plt.show()return results# 执行基准测试
benchmark_results = benchmark_wavelet_methods(sig_noisy, iterations=100)
🎓 第七部分:高级应用
7.1 小波变换用于特征提取
def extract_wavelet_features(signal, wavelet='db4', level=5):"""从信号中提取小波特征用于机器学习和模式识别"""# 小波分解coeffs = pywt.wavedec(signal, wavelet, level=level)features = {}# 对每层系数提取统计特征for i, coeff in enumerate(coeffs):if i == 0:prefix = f'cA{level}'else:prefix = f'cD{level - i + 1}'# 统计特征features[f'{prefix}_mean'] = np.mean(coeff)features[f'{prefix}_std'] = np.std(coeff)features[f'{prefix}_energy'] = np.sum(coeff ** 2)features[f'{prefix}_max'] = np.max(np.abs(coeff))features[f'{prefix}_entropy'] = -np.sum(coeff**2 * np.log(coeff**2 + 1e-12))# 过零率zero_crossings = np.sum(np.diff(np.sign(coeff)) != 0)features[f'{prefix}_zcr'] = zero_crossings / len(coeff)return featuresdef visualize_feature_extraction(signals_dict, wavelet='db4', level=5):"""可视化不同信号的小波特征"""all_features = {}for signal_name, signal_data in signals_dict.items():features = extract_wavelet_features(signal_data, wavelet, level)all_features[signal_name] = features# 创建特征矩阵feature_names = list(all_features[list(signals_dict.keys())[0]].keys())feature_matrix = np.array([[all_features[name][feat] for feat in feature_names] for name in signals_dict.keys()])# 归一化from sklearn.preprocessing import StandardScalerscaler = StandardScaler()feature_matrix_scaled = scaler.fit_transform(feature_matrix)# 可视化热图fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 6))im1 = ax1.imshow(feature_matrix.T, aspect='auto', cmap='viridis')ax1.set_xticks(range(len(signals_dict)))ax1.set_xticklabels(signals_dict.keys(), rotation=45, ha='right')ax1.set_yticks(range(len(feature_names)))ax1.set_yticklabels(feature_names, fontsize=8)ax1.set_title('原始特征值', fontsize=14, fontweight='bold')plt.colorbar(im1, ax=ax1)im2 = ax2.imshow(feature_matrix_scaled.T, aspect='auto', cmap='RdBu', vmin=-3, vmax=3)ax2.set_xticks(range(len(signals_dict)))ax2.set_xticklabels(signals_dict.keys(), rotation=45, ha='right')ax2.set_yticks(range(len(feature_names)))ax2.set_yticklabels(feature_names, fontsize=8)ax2.set_title('标准化特征值', fontsize=14, fontweight='bold')plt.colorbar(im2, ax=ax2)plt.tight_layout()plt.savefig('wavelet_feature_extraction.png', dpi=300, bbox_inches='tight')plt.show()return all_features, feature_matrix# 生成不同类型的信号
signals = {'正常信号': sig_clean[:1000],'含噪信号': sig_noisy[:1000],'去噪信号': sig_denoised_soft[:1000],'低频信号': comp1[:1000],'高频信号': comp3[:1000]
}features, feature_matrix = visualize_feature_extraction(signals)
7.2 小波变换用于信号压缩
def wavelet_compression(signal, wavelet='db4', level=5, compression_ratio=0.1):"""使用小波变换进行信号压缩参数:signal: 输入信号wavelet: 小波类型level: 分解层数compression_ratio: 保留系数的比例 (0-1)"""# 小波分解coeffs = pywt.wavedec(signal, wavelet, level=level)# 展平所有系数coeff_arr, coeff_slices = pywt.coeffs_to_array(coeffs)# 计算阈值(保留最大的N个系数)num_keep = int(len(coeff_arr) * compression_ratio)threshold = np.sort(np.abs(coeff_arr))[-num_keep]# 应用阈值coeff_arr_compressed = coeff_arr.copy()coeff_arr_compressed[np.abs(coeff_arr_compressed) < threshold] = 0# 重构系数列表coeffs_compressed = pywt.array_to_coeffs(coeff_arr_compressed, coeff_slices, output_format='wavedec')# 重构信号signal_reconstructed = pywt.waverec(coeffs_compressed, wavelet)signal_reconstructed = signal_reconstructed[:len(signal)]# 计算压缩统计num_nonzero_original = len(coeff_arr)num_nonzero_compressed = np.count_nonzero(coeff_arr_compressed)actual_compression_ratio = num_nonzero_compressed / num_nonzero_original# 计算重构误差mse = np.mean((signal - signal_reconstructed) ** 2)psnr = 10 * np.log10(np.max(signal)**2 / mse) if mse > 0 else float('inf')stats = {'original_coeffs': num_nonzero_original,'compressed_coeffs': num_nonzero_compressed,'compression_ratio': actual_compression_ratio,'mse': mse,'psnr': psnr}return signal_reconstructed, coeffs_compressed, statsdef test_compression_ratios(signal, ratios=[0.01, 0.05, 0.1, 0.2, 0.5]):"""测试不同压缩比的效果"""fig, axes = plt.subplots(len(ratios) + 1, 1, figsize=(14, 3 * (len(ratios) + 1)))t = np.arange(len(signal)) / 1000# 原始信号axes[0].plot(t, signal, 'k', linewidth=1.5)axes[0].set_title('原始信号', fontsize=12, fontweight='bold')axes[0].set_ylabel('幅度')axes[0].grid(True)results = {}# 不同压缩比for idx, ratio in enumerate(ratios):reconstructed, _, stats = wavelet_compression(signal, compression_ratio=ratio)results[ratio] = statsaxes[idx + 1].plot(t, reconstructed, linewidth=1.2)axes[idx + 1].set_title(f'压缩比: {ratio:.1%} | 保留系数: {stats["compressed_coeffs"]}/{stats["original_coeffs"]} | PSNR: {stats["psnr"]:.2f}dB',fontsize=11, fontweight='bold')axes[idx + 1].set_ylabel('幅度')axes[idx + 1].grid(True)axes[-1].set_xlabel('时间 (s)')plt.tight_layout()plt.savefig('wavelet_compression.png', dpi=300, bbox_inches='tight')plt.show()# 打印统计表print("\n" + "="*70)print("小波压缩性能统计")print("="*70)print(f"{'压缩比':<15} {'保留系数数量':<20} {'MSE':<15} {'PSNR (dB)':<15}")print("-"*70)for ratio, stats in results.items():print(f"{ratio:<15.1%} {stats['compressed_coeffs']:<20} "f"{stats['mse']:<15.6f} {stats['psnr']:<15.2f}")print("="*70)return results# 测试压缩
compression_results = test_compression_ratios(sig_clean)
📝 总结与展望
主要内容回顾
本文详细介绍了小波变换的理论基础、实现方法和实际应用:
-
理论基础
- 小波变换的数学定义
- 连续小波变换(CWT)和离散小波变换(DWT)
- 多分辨率分析理论
- 常用小波基函数
-
Python实现
- 使用PyWavelets库进行小波分解和重构
- 实现小波去噪算法
- 小波包分析
- 实时信号处理
-
MATLAB实现
- 完整的MATLAB实现代码
- 各种小波函数的使用
- 可视化和性能评估
-
实际应用
- 心电信号(ECG)去噪
- 加速度传感器信号处理
- 图像去噪
- 特征提取
- 信号压缩
优缺点分析
优点:
- ✅ 时频局部化特性优秀
- ✅ 多尺度分析能力强
- ✅ 适合处理非平稳信号
- ✅ 应用领域广泛
缺点:
- ❌ 参数选择需要经验
- ❌ 边界效应处理复杂
- ❌ 计算复杂度较高
应用领域
- 🏥 生物医学信号处理: ECG、EEG、EMG等
- 🔊 音频信号处理: 语音增强、音乐分析
- 📷 图像处理: 压缩、去噪、边缘检测
- 📡 通信系统: 信道编码、调制解调
- 🏭 故障诊断: 机械振动分析
- 💰 金融分析: 时间序列预测
未来发展方向
- 深度学习与小波的结合: WaveletNet, Wavelet-CNN
- 自适应小波: 根据信号特性自动选择最优小波基
- 稀疏表示: 压缩感知中的应用
- 量子小波变换: 量子计算领域的探索
📚 参考资料
-
Mallat, S. (1989). “A theory for multiresolution signal decomposition: the wavelet representation.” IEEE Transactions on Pattern Analysis and Machine Intelligence.
-
Daubechies, I. (1992). Ten Lectures on Wavelets. SIAM.
-
Addison, P. S. (2002). The Illustrated Wavelet Transform Handbook. CRC Press.
-
Torrence, C., & Compo, G. P. (1998). “A practical guide to wavelet analysis.” Bulletin of the American Meteorological Society.
-
PyWavelets Documentation: https://pywavelets.readthedocs.io/
-
MATLAB Wavelet Toolbox Documentation: https://www.mathworks.com/help/wavelet/
💡 实践建议
-
小波选择:
- 平滑信号:db4-db8
- ECG信号:db6, sym5
- 图像处理:haar, db1, bior
-
分解层数:
- 一般选择 log2(N) 其中N为信号长度
- 常用范围:3-8层
-
阈值选择:
- 通用阈值: σ√(2ln(N))
- 自适应阈值根据具体应用调整
-
性能优化:
- 使用FFT加速
- 考虑边界处理
- 批处理提高效率