当前位置: 首页 > news >正文

电路笔记(信号):相关匹配/模板匹配 (Correlation/Template Matching) 解码 + python实现示例

相关性解码

  • 相关匹配/模板匹配 (Correlation/Template Matching) 解码方法特别适用于信号质量较差(有噪声、失真)或需要高可靠性的场景。其核心思想是:将接收到的波形片段与预先定义的“理想”波形模板进行对比,计算它们的相似度(相关值),并选择相似度最高的模板作为解码结果。它的优势在低信噪比、高可靠性要求的场景下尤为突出。随着计算能力的提升,这种经典的信号处理方法在现代智能系统(如自动驾驶、智能医疗设备)中依然扮演着不可或缺的角色:
    • 数字调制解调
      • BPSK/QPSK 解调:接收端使用本地生成的载波和已知的符号波形作为模板,与接收到的信号做相关,以判决发送的是 0 还是 1 或哪个相位。
      • 扩频通信 (如 CDMA):利用伪随机码(PN码)作为模板进行相关解扩。只有使用相同码的接收机才能从宽带噪声中“拉出”有用信号,实现多址接入和抗干扰。
    • 同步字检测
      • 在数据帧开头插入特定的“同步头”(如 0101... 或巴克码)。接收端用此同步头作为模板,在接收流中滑动匹配,一旦找到高相关值,即确认帧起始位置,完成帧同步。
    • 雷达通信一体化:在通信信号中嵌入雷达探测功能,利用相关性检测目标回波。
优点缺点
抗噪声能力强:即使信号有失真,只要整体形状匹配,仍能正确解码。计算量大:每个位都需要 2 次点积运算(TEMPLATE_LEN 次乘加),对 CPU 要求高。
可靠性高:相比简单阈值法,误码率更低。需要预知采样率和位率:模板依赖于固定的 SAMPLE_RATEBIT_RATE
易于实现:逻辑清晰,代码结构简单。内存占用:需要存储模板数组和中间结果存储。
可扩展:可轻松添加同步头、状态字等更复杂的模板。对时钟漂移敏感:如果实际位周期与模板不匹配,性能下降。

实现逻辑

  • 模板生成(01 的曼彻斯特编码波形)
  • 滑动窗口相关匹配
  • 同步头检测(3 位同步头)
  • 帧解析
  • 奇校验验证
  • 完整的解码流程演示

python实现

在这里插入图片描述

  • 波形协议为:【电路笔记 通信】:数字式时分制指令响应型多路传输数据总线 1553协议(MIL-STD-1553) & 289A-97协议
import numpy as np
import matplotlib.pyplot as plt
from typing import List, Tuple, Optional
from scipy import signal# ========================
# 配置参数
# ========================
# 采样信息
SAMPLE_RATE = 10_000_000      # 10 MHz 采样率
BIT_RATE = 1_000_000          # 1 Mbps (MIL-STD-1553B)
HALF_BIT_SAMPLES = SAMPLE_RATE // BIT_RATE // 2  # 半位采样点数   5
BIT_SAMPLES = 2 * HALF_BIT_SAMPLES               # 一位总采样点数 10# 信号电平(模拟ADC值)
HIGH_LEVEL = 10000
LOW_LEVEL = -10000# 帧结构   
ALL_DATA_BITS = 17  # 16个数据位和一个校验位
SYNC_LENGTH = 15 # 同步头# ========================
# 模板生成
# ========================
def generate_manchester_templates() -> Tuple[np.ndarray, np.ndarray]:"""生成 MIL-STD-1553B 曼彻斯特编码模板- '1': 正脉冲 + 负脉冲  -> [HIGH, LOW]- '0': 负脉冲 + 正脉冲  -> [LOW, HIGH]"""template_1 = np.concatenate([np.full(HALF_BIT_SAMPLES, HIGH_LEVEL),np.full(HALF_BIT_SAMPLES, LOW_LEVEL)]) # []template_0 = np.concatenate([np.full(HALF_BIT_SAMPLES, LOW_LEVEL),np.full(HALF_BIT_SAMPLES, HIGH_LEVEL)])return template_0, template_1# 生成同步头模板
def generate_sync_template(template_1: np.ndarray, num_sync_bits: int = 3) -> np.ndarray:"""生成同步头模板"""return np.concatenate([np.zeros(SYNC_LENGTH, dtype=int), np.ones(SYNC_LENGTH, dtype=int)]) # 共30位,15个低位+15个高位。# ========================
# 相关匹配查找同步头
# ========================
def find_sync_start_correlation(waveform: np.ndarray, length: int, sync_template: np.ndarray, threshold: float = 75000.0) -> int:"""使用互相关查找同步头位置,仅当相关值超过阈值且为局部最大值时才认为是有效同步头。参数:waveform (np.ndarray): 输入的波形数据length (int): 搜索范围长度(从 waveform 开头搜索)sync_template (np.ndarray): 同步头模板threshold (float): 相关值阈值返回:int: 第一个有效同步头的起始位置(索引),未找到返回 -1"""# 截取搜索范围search_wave = waveform[:length]# 计算互相关,mode='valid' 表示只有完全重叠的部分corr = np.correlate(search_wave, sync_template, mode='valid')# 如果相关结果为空,直接返回-1if len(corr) == 0:return -1# 存储局部最大值的位置peak_positions = []# 遍历相关值,寻找局部最大值for i in range(1, len(corr) - 1):  # 忽略边界if corr[i] > threshold:  # 先判断是否超过阈值# 判断是否为局部最大值(峰值)if corr[i] > corr[i-1] and corr[i] > corr[i+1]:peak_positions.append(i)# 如果没有找到任何峰值,返回 -1if not peak_positions:return -1# 返回第一个检测到的峰值位置(对应 waveform 的起始索引)# 注意:corr[i] 对应 waveform[i] 到 waveform[i + len(template)-1]return peak_positions[0]# ========================
# 相关匹配解码单个位
# ========================
def match_bit(signal_segment: np.ndarray, template_0: np.ndarray, template_1: np.ndarray) -> int:"""使用点积相关匹配判断是 0 还是 1返回: 0, 1, 或 -1(无效)"""if len(signal_segment) != len(template_0):raise ValueError("Signal segment length does not match template length")corr_0 = np.dot(signal_segment, template_0)corr_1 = np.dot(signal_segment, template_1)if corr_0 > corr_1:return 0elif corr_1 > corr_0:return 1else:return -1  # 平局,视为错误def match_bit_r_corr(signal_segment: np.ndarray, template_0: np.ndarray, template_1: np.ndarray) -> int:"""使用点积相关匹配判断是 0 还是 1返回: 0, 1, 或 -1(无效)"""if len(signal_segment) != len(template_0):raise ValueError("Signal segment length does not match template length")corr_0 = np.dot(signal_segment, template_0) # -1*(signal_segment[0]+signal_segment[1]+signal_segment[2]+signal_segment[3]+signal_segment[4])+(signal_segment[5]+signal_segment[6]+signal_segment[7]+signal_segment[8]+signal_segment[9])corr_1 = np.dot(signal_segment, template_1) # (signal_segment[0]+signal_segment[1]+signal_segment[2]+signal_segment[3]+signal_segment[4])-1*(signal_segment[5]+signal_segment[6]+signal_segment[7]+signal_segment[8]+signal_segment[9])if corr_0 > corr_1:return 0, corr_0elif corr_1 > corr_0:return 1, corr_1else:return -1,0  # 平局,视为错误# ========================
# 滑动窗口法寻找极值位
# ========================
def find_peaks_sliding_window(data, window_size=11, threshold=0.3):"""使用滑动窗口检测局部极值参数:data: 输入时间序列window_size: 滑动窗口大小(必须为奇数)threshold: 相对幅度阈值返回:positive_peaks: 正峰值位置negative_peaks: 负峰值位置"""if window_size % 2 == 0:window_size += 1  # 确保窗口大小为奇数half_window = window_size // 2positive_peaks = []negative_peaks = []# 计算全局统计量用于阈值data_range = np.max(data) - np.min(data)abs_threshold = threshold * data_rangefor i in range(half_window, len(data) - half_window):window = data[i - half_window:i + half_window + 1]center_value = data[i]# 检查是否是正峰值if center_value == np.max(window) and center_value > np.min(window) + abs_threshold:# 确保是严格的局部最大值is_peak = Truefor j in range(1, half_window + 1):if data[i - j] >= center_value or data[i + j] >= center_value:is_peak = Falsebreakif is_peak:positive_peaks.append(i)# 检查是否是负峰值elif center_value == np.min(window) and center_value < np.max(window) - abs_threshold:# 确保是严格的局部最小值is_valley = Truefor j in range(1, half_window + 1):if data[i - j] <= center_value or data[i + j] <= center_value:is_valley = Falsebreakif is_valley:negative_peaks.append(i)return np.array(positive_peaks), np.array(negative_peaks)# ========================
# 滑动窗口解码位流
# ========================
def decode_bits_from_waveform(waveform: np.ndarray, template_0: np.ndarray, template_1: np.ndarray) -> List[int]:"""从波形中解码出位流"""bits = []i = 0j = 0while (i <= len(waveform) - BIT_SAMPLES) & (j<17):segment = waveform[i:i + BIT_SAMPLES] bit,corr = match_bit_r_corr(segment, template_0, template_1) # match_bit时平局,视为错误,返回-1if bit != -1:bits.append(bit)else:bits.append(-1)  # 标记错误位i += BIT_SAMPLES  # 移动一个位j += 1            # 解码的数据个数加一 return bits# ========================
# 数据校验  如果符合偶数校验则返回True,否则返回False
# ========================
def even_parity_check(decoded_bits):"""对给定的二进制列表进行偶数奇偶校验。参数:decoded_bits (list): 二进制位列表(0或1),最后一位被认为是奇偶校验位返回:bool: 如果符合偶数奇偶校验则返回True,否则返回False"""# 计算除了校验位外的所有1的数目count_ones = sum(decoded_bits[:-1])# 判断包括校验位在内的所有1的数目是否为偶数return (count_ones + decoded_bits[-1]) % 2 == 0# ========================
# 绘制数据
# ========================
def plot_data(data):plt.figure(figsize=(10, 5)) # 设置图表大小plt.plot(data, marker='o') # 绘制数据点,添加标记使数据点更明显plt.title('Data Plot') # 添加标题plt.xlabel('Index') # X轴标签plt.ylabel('Value') # Y轴标签plt.grid(True) # 显示网格线# 显示图表plt.show()print(data) # 绘制同步头的位置
def plot_data_mark(data,sync_positions):"""绘制数据,并在 sync_positions 指定的位置添加垂直线标记(如同步头位置)参数:data (list or np.ndarray): 要绘制的数据sync_positions (list of int): 需要标记的位置索引列表,例如 [150, 450]"""plt.figure(figsize=(12, 6))  # 设置图表大小plt.plot(data, marker='o', markersize=3, linewidth=1, label='Signal')  # 绘制数据plt.axvline(x=sync_positions, color='red', linestyle='--', linewidth=2, alpha=0.8)plt.axvline(x=sync_positions+30, color='red', linestyle='--', linewidth=2, alpha=0.8)plt.title('Data Plot with Sync Markers')  # 添加标题plt.xlabel('Sample Index')                # X轴标签plt.ylabel('Amplitude')                   # Y轴标签plt.grid(True, alpha=0.6)                 # 显示网格线# 图例:只显示一次 "Sync" 标签handles, labels = plt.gca().get_legend_handles_labels()if handles:plt.legend()# 显示图表plt.tight_layout()plt.show()# 输出数据(可选)print("Data:", data)print("Sync positions marked:", sync_positions)# 绘制多个同步头的位置
def plot_data_mark_mut(data,sync_positions):"""绘制数据,并在 sync_positions 指定的位置添加垂直线标记(如同步头位置)参数:data (list or np.ndarray): 要绘制的数据sync_positions (list of int): 需要标记的位置索引列表,例如 [150, 450]"""plt.figure(figsize=(12, 6))  # 设置图表大小plt.plot(data, marker='o', markersize=3, linewidth=1, label='Signal')  # 绘制数据for pos in sync_positions:plt.axvline(x=pos, color='red', linestyle='--', linewidth=2, alpha=0.8)plt.axvline(x=pos+30, color='red', linestyle='--', linewidth=2, alpha=0.8)plt.title('Data Plot with Sync Markers')  # 添加标题plt.xlabel('Sample Index')                # X轴标签plt.ylabel('Amplitude')                   # Y轴标签plt.grid(True, alpha=0.6)                 # 显示网格线# 图例:只显示一次 "Sync" 标签handles, labels = plt.gca().get_legend_handles_labels()if handles:plt.legend()# 显示图表plt.tight_layout()plt.show()# 输出数据(可选)print("Data:", data)print("Sync positions marked:", sync_positions)def plot_data_mark_mut_dot(data, sync_positions):"""绘制数据,并在 sync_positions 指定的位置添加垂直线标记(如同步头位置)在第二个红线后以10为距离添加标记,直到下一个第一条红线为止,标记显示对应index"""plt.figure(figsize=(12, 6))  # 设置图表大小plt.plot(data, marker='o', markersize=3, linewidth=1, label='Signal')  # 绘制数据# 先绘制所有红线red_lines = []for pos in sync_positions:# 第一条红线位置first_line = pos# 第二条红线位置second_line = pos + 30plt.axvline(x=first_line, color='red', linestyle='--', linewidth=2, alpha=0.8)plt.axvline(x=second_line, color='red', linestyle='--', linewidth=2, alpha=0.8)red_lines.append((first_line, second_line))# 为每个区间添加浮动文本标记for i, (first_line, second_line) in enumerate(red_lines):# 确定当前区间的结束位置(下一个第一条红线)if i < len(red_lines) - 1:end_pos = red_lines[i+1][0]  # 下一个第一条红线位置else:end_pos = len(data) - 1  # 最后一个区间以数据长度为终点# 从第二条红线后10个位置开始start_pos = second_line + 10# 确保起始位置不超过结束位置if start_pos > end_pos:continue# 以10为间隔添加浮动标记(显示index)current_pos = start_poswhile current_pos <= end_pos:# 获取当前位置的数据值(用于定位文本)data_value = data[current_pos]# 添加浮动文本标记(显示当前index)# 位置:在点的右下方(可通过xytext调整偏移)# 仅第一个标记添加图例标签label = 'Index Mark' if (i == 0 and current_pos == start_pos) else ""if(current_pos in score_dict):plt.text(current_pos, data_value,  # 文本锚点位置(对应数据点)f'{score_dict[current_pos]}',         # 显示的文本内容(index值)fontsize=8,               # 文本大小color='blue',             # 文本颜色ha='left', va='bottom',   # 水平/垂直对齐方式bbox=dict(facecolor='white', alpha=0.3, boxstyle='round,pad=0.3'),  # 白色背景框label=label)# print("----",current_pos)current_pos += 10plt.title('Data Plot with Sync Markers')  # 添加标题plt.xlabel('Sample Index')                # X轴标签plt.ylabel('Amplitude')                   # Y轴标签plt.grid(True, alpha=0.6)                 # 显示网格线# 处理图例,确保只显示唯一标签handles, labels = plt.gca().get_legend_handles_labels()by_label = dict(zip(labels, handles))plt.legend(by_label.values(), by_label.keys())# 显示图表plt.tight_layout()plt.show()# 输出数据(可选)print("Data:", data)print("Sync positions marked:", sync_positions)# ========================
# 数据滤波
# ========================
def lowpass_filter_scipy(data, cutoff_freq, sampling_rate, filter_type='butter', order=4):"""使用SciPy对NumPy数组进行低通滤波参数:data: numpy数组,输入信号cutoff_freq: float, 截止频率 (Hz)sampling_rate: float, 采样频率 (Hz)filter_type: str, 滤波器类型 ('butter', 'cheby1', 'bessel')order: int, 滤波器阶数返回:filtered_data: 滤波后的numpy数组"""# 计算归一化截止频率 (Nyquist频率的倍数)nyquist_freq = 0.5 * sampling_ratenormal_cutoff = cutoff_freq / nyquist_freqif filter_type == 'butter':b, a = signal.butter(order, normal_cutoff, btype='low', analog=False)elif filter_type == 'cheby1':b, a = signal.cheby1(order, 0.5, normal_cutoff, btype='low', analog=False)elif filter_type == 'bessel':b, a = signal.bessel(order, normal_cutoff, btype='low', analog=False)else:raise ValueError("不支持的滤波器类型")# 应用滤波器filtered_data = signal.filtfilt(b, a, data)return filtered_datadef moving_average_filter(data, window_size):"""使用移动平均进行低通滤波参数:data: numpy数组window_size: int, 窗口大小返回:filtered_data: 滤波后的数组"""if window_size <= 1:return data.copy()# 使用卷积实现移动平均window = np.ones(window_size) / window_sizefiltered_data = np.convolve(data, window, mode='same')return filtered_datares_dict = {}
score_dict = {}if __name__ == "__main__":sync_head_num = 0 # 查找到同步头的个数paritiy_num = 0paritiy_list = []not_par_num = 0not_par_list = []data = np.loadtxt('***.dat', skiprows=1) # 加载数据文件,跳过第一行# 预处理数据data = -1*data print(data)startindex = 0data = data[startindex:startindex+49000] # 100000# 0.预处理# data = moving_average_filter(data,2)data = lowpass_filter_scipy(data,40000,100000)# 1. 生成模板template_0, template_1 = generate_manchester_templates()sync_template = generate_sync_template(template_1, 3)# 2. 波形数据test_wave = data# plot_data(test_wave)data_len = data.sizeindex = 0sync_list = [] # 标记同步头位置的列表while(index<(data_len-1000)): # 留1000个缓冲空间# 查找同步头sync_positions =  -1window_size = 300window_overlap = 100while(sync_positions ==-1):index = index + window_size - window_overlap # 窗口重叠segment = test_wave[index:-1]if(segment.size != 0):sync_positions = find_sync_start_correlation(segment, window_size, sync_template, threshold= 55000.0)else:print("已遍历所有数据")breakif(segment.size != 0):   sync_head_num = sync_head_num +1print("第一个同步头的位置",index+sync_positions)    # plot_data_mark(test_wave[0:-1],index+sync_positions) # python 不支持函数重载sync_list.append(index+sync_positions)# 3. 解码位流data = test_wave[index+sync_positions+30:index+sync_positions+30+170] print("+++",index+sync_positions+30)decoded_bits = decode_bits_from_waveform(data, template_0, template_1) # 解完17个就返回res_dict[index+sync_positions] = decoded_bitsvalue = index+sync_positions # +30+10+0*10# score_dict[value] = r_list# 4. 对list数据decoded_bits进行奇偶校验is_valid = even_parity_check(decoded_bits)print(f"奇偶校验{'通过' if is_valid else '未通过'}")if is_valid:paritiy_num = paritiy_num + 1paritiy_list.append(index+sync_positions)else:not_par_num = not_par_num + 1not_par_list.append(index+sync_positions)print("-------------统计信息------------")print("找到同步头",sync_head_num)print("奇偶校验未通过个数",paritiy_num,paritiy_list)print("奇偶校验通过个数",not_par_num,not_par_list)print("---------------------------------")print("-------------绘制同步头位置------------")plot_data_mark_mut_dot(test_wave,sync_list)print("---------------------------------")

互相关解码

import numpy as np
import matplotlib.pyplot as plt
from typing import List, Tuple, Optional
from scipy import signal# ========================
# 配置参数
# ========================
# 采样信息
SAMPLE_RATE = 10_000_000      # 10 MHz 采样率
BIT_RATE = 1_000_000          # 1 Mbps (MIL-STD-1553B)
HALF_BIT_SAMPLES = SAMPLE_RATE // BIT_RATE // 2  # 半位采样点数   5
BIT_SAMPLES = 2 * HALF_BIT_SAMPLES               # 一位总采样点数 10# 信号电平(模拟ADC值)
HIGH_LEVEL = 10000
LOW_LEVEL = -10000# 帧结构   
ALL_DATA_BITS = 17  # 16个数据位和一个校验位
SYNC_LENGTH = 15 # 同步头# ========================
# 模板生成
# ========================
def generate_manchester_templates() -> Tuple[np.ndarray, np.ndarray]:"""生成 MIL-STD-1553B 曼彻斯特编码模板- '1': 正脉冲 + 负脉冲  -> [HIGH, LOW]- '0': 负脉冲 + 正脉冲  -> [LOW, HIGH]"""template_1 = np.concatenate([np.full(HALF_BIT_SAMPLES, HIGH_LEVEL),np.full(HALF_BIT_SAMPLES, LOW_LEVEL)]) # []template_0 = np.concatenate([np.full(HALF_BIT_SAMPLES, LOW_LEVEL),np.full(HALF_BIT_SAMPLES, HIGH_LEVEL)])return template_0, template_1# 生成同步头模板
def generate_sync_template(template_1: np.ndarray, num_sync_bits: int = 3) -> np.ndarray:"""生成同步头模板"""return np.concatenate([np.zeros(SYNC_LENGTH, dtype=int), np.ones(SYNC_LENGTH, dtype=int)]) # 共30位,15个低位+15个高位。# ========================
# 相关匹配查找同步头
# ========================
def find_sync_start_correlation(waveform: np.ndarray, length: int, sync_template: np.ndarray, threshold: float = 75000.0) -> int:"""使用互相关查找同步头位置,仅当相关值超过阈值且为局部最大值时才认为是有效同步头。参数:waveform (np.ndarray): 输入的波形数据length (int): 搜索范围长度(从 waveform 开头搜索)sync_template (np.ndarray): 同步头模板threshold (float): 相关值阈值返回:int: 第一个有效同步头的起始位置(索引),未找到返回 -1"""# 截取搜索范围search_wave = waveform[:length]# 计算互相关,mode='valid' 表示只有完全重叠的部分corr = np.correlate(search_wave, sync_template, mode='valid')# 如果相关结果为空,直接返回-1if len(corr) == 0:return -1# 存储局部最大值的位置peak_positions = []# 遍历相关值,寻找局部最大值for i in range(1, len(corr) - 1):  # 忽略边界if corr[i] > threshold:  # 先判断是否超过阈值# 判断是否为局部最大值(峰值)if corr[i] > corr[i-1] and corr[i] > corr[i+1]:peak_positions.append(i)# 如果没有找到任何峰值,返回 -1if not peak_positions:return -1# 返回第一个检测到的峰值位置(对应 waveform 的起始索引)# 注意:corr[i] 对应 waveform[i] 到 waveform[i + len(template)-1]return peak_positions[0]# ========================
# 相关匹配解码单个位
# ========================
def match_bit(signal_segment: np.ndarray, template_0: np.ndarray, template_1: np.ndarray) -> int:"""使用点积相关匹配判断是 0 还是 1返回: 0, 1, 或 -1(无效)"""if len(signal_segment) != len(template_0):raise ValueError("Signal segment length does not match template length")corr_0 = np.dot(signal_segment, template_0)corr_1 = np.dot(signal_segment, template_1)if corr_0 > corr_1:return 0elif corr_1 > corr_0:return 1else:return -1  # 平局,视为错误def match_bit_r_corr(signal_segment: np.ndarray, template_0: np.ndarray, template_1: np.ndarray) -> int:"""使用点积相关匹配判断是 0 还是 1返回: 0, 1, 或 -1(无效)"""if len(signal_segment) != len(template_0):raise ValueError("Signal segment length does not match template length")corr_0 = np.dot(signal_segment, template_0) # -1*(signal_segment[0]+signal_segment[1]+signal_segment[2]+signal_segment[3]+signal_segment[4])+(signal_segment[5]+signal_segment[6]+signal_segment[7]+signal_segment[8]+signal_segment[9])corr_1 = np.dot(signal_segment, template_1) # (signal_segment[0]+signal_segment[1]+signal_segment[2]+signal_segment[3]+signal_segment[4])-1*(signal_segment[5]+signal_segment[6]+signal_segment[7]+signal_segment[8]+signal_segment[9])if corr_0 > corr_1:return 0, corr_0elif corr_1 > corr_0:return 1, corr_1else:return -1,0  # 平局,视为错误# ========================
# 滑动窗口法寻找极值位
# ========================
def find_peaks_sliding_window(data, window_size=11, threshold=0.3):"""使用滑动窗口检测局部极值参数:data: 输入时间序列window_size: 滑动窗口大小(必须为奇数)threshold: 相对幅度阈值返回:positive_peaks: 正峰值位置negative_peaks: 负峰值位置"""if window_size % 2 == 0:window_size += 1  # 确保窗口大小为奇数half_window = window_size // 2positive_peaks = []negative_peaks = []# 计算全局统计量用于阈值data_range = np.max(data) - np.min(data)abs_threshold = threshold * data_rangefor i in range(half_window, len(data) - half_window):window = data[i - half_window:i + half_window + 1]center_value = data[i]# 检查是否是正峰值if center_value == np.max(window) and center_value > np.min(window) + abs_threshold:# 确保是严格的局部最大值is_peak = Truefor j in range(1, half_window + 1):if data[i - j] >= center_value or data[i + j] >= center_value:is_peak = Falsebreakif is_peak:positive_peaks.append(i)# 检查是否是负峰值elif center_value == np.min(window) and center_value < np.max(window) - abs_threshold:# 确保是严格的局部最小值is_valley = Truefor j in range(1, half_window + 1):if data[i - j] <= center_value or data[i + j] <= center_value:is_valley = Falsebreakif is_valley:negative_peaks.append(i)return np.array(positive_peaks), np.array(negative_peaks)def find_alternating_peaks(time_series):"""查找时间序列中交替出现的正负峰值,要求正负峰值间数据单调参数:time_series: 一维数组或列表,代表输入的时间序列返回:peaks: 列表,包含检测到的峰值信息,每个元素为元组(位置索引, 峰值类型, 峰值大小)其中峰值类型为'positive'(正峰)或'negative'(负峰)"""if len(time_series) < 3:return []  # 太短的序列无法形成峰值# 转换为numpy数组便于处理ts = np.asarray(time_series)peaks = []zero_peak_type = 'positive'if ts[0]<0:zero_peak_type = 'negative'peaks.append((0, zero_peak_type, ts[0]))n = len(ts)i = 1  # 从第二个元素开始检查next_expected_type = 'negative' if zero_peak_type == 'positive' else 'positive'while i < n - 1:# 检查是否符合预期类型的峰值is_positive_peak = (ts[i] > ts[i-1] and ts[i] > ts[i+1])is_negative_peak = (ts[i] < ts[i-1] and ts[i] < ts[i+1])if (i>=3) & (i<=158):is_positive_peak = (ts[i] > ts[i-1] and ts[i] > ts[i+1]) & (ts[i] > ts[i-2] and ts[i] > ts[i+2])is_negative_peak = (ts[i] < ts[i-1] and ts[i] < ts[i+1]) & (ts[i] < ts[i-2] and ts[i] < ts[i+2])if (next_expected_type == 'positive' and is_positive_peak) or \(next_expected_type == 'negative' and is_negative_peak):peaks.append((i, next_expected_type, ts[i]))next_expected_type = 'negative' if next_expected_type == 'positive' else 'positive'i += 1return peaks # 感觉还是要用滑动窗口方法# ========================
# 滑动窗口解码位流
# ========================
def decode_bits_from_waveform(waveform: np.ndarray, template_0: np.ndarray, template_1: np.ndarray) -> List[int]:"""从波形中解码出位流"""bits = []i = 0j = 0while (i <= len(waveform) - BIT_SAMPLES) & (j<17):segment = waveform[i:i + BIT_SAMPLES] bit,corr = match_bit_r_corr(segment, template_0, template_1) # match_bit时平局,视为错误,返回-1if bit != -1:bits.append(bit)else:bits.append(-1)  # 标记错误位i += BIT_SAMPLES  # 移动一个位j += 1            # 解码的数据个数加一 return bitsdef decode_bits_from_waveform_r_peaks(waveform: np.ndarray, template_0: np.ndarray, template_1: np.ndarray):"""从波形中解码出位流"""bits = []i = 0corr = np.correlate(waveform, template_0, mode='valid')res = find_alternating_peaks(corr) # 峰值数据if res[0][1] == 'positive':bits.append(0)else:bits.append(1)flag = Falsefor i in range(1,len(res)):if(flag):if res[i][1] == 'positive':bits.append(0)else:bits.append(1)flag = not flagif ((res[i][0] - res[i-1][0]) > 6+1):flag = Falseif res[i][1] == 'positive':bits.append(0)else:bits.append(1)else:passreturn bits[0:17],res# ========================
# 数据校验  如果符合偶数校验则返回True,否则返回False
# ========================
def even_parity_check(decoded_bits):"""对给定的二进制列表进行偶数奇偶校验。参数:decoded_bits (list): 二进制位列表(0或1),最后一位被认为是奇偶校验位返回:bool: 如果符合偶数奇偶校验则返回True,否则返回False"""# 计算除了校验位外的所有1的数目count_ones = sum(decoded_bits[:-1])# 判断包括校验位在内的所有1的数目是否为偶数return (count_ones + decoded_bits[-1]) % 2 == 0# ========================
# 绘制数据
# ========================
def plot_data(data):plt.figure(figsize=(10, 5)) # 设置图表大小plt.plot(data, marker='o') # 绘制数据点,添加标记使数据点更明显plt.title('Data Plot') # 添加标题plt.xlabel('Index') # X轴标签plt.ylabel('Value') # Y轴标签plt.grid(True) # 显示网格线# 显示图表plt.show()print(data) # 绘制同步头的位置
def plot_data_mark(data,sync_positions):"""绘制数据,并在 sync_positions 指定的位置添加垂直线标记(如同步头位置)参数:data (list or np.ndarray): 要绘制的数据sync_positions (list of int): 需要标记的位置索引列表,例如 [150, 450]"""plt.figure(figsize=(12, 6))  # 设置图表大小plt.plot(data, marker='o', markersize=3, linewidth=1, label='Signal')  # 绘制数据plt.axvline(x=sync_positions, color='red', linestyle='--', linewidth=2, alpha=0.8)plt.axvline(x=sync_positions+30, color='red', linestyle='--', linewidth=2, alpha=0.8)plt.title('Data Plot with Sync Markers')  # 添加标题plt.xlabel('Sample Index')                # X轴标签plt.ylabel('Amplitude')                   # Y轴标签plt.grid(True, alpha=0.6)                 # 显示网格线# 图例:只显示一次 "Sync" 标签handles, labels = plt.gca().get_legend_handles_labels()if handles:plt.legend()# 显示图表plt.tight_layout()plt.show()# 输出数据(可选)print("Data:", data)print("Sync positions marked:", sync_positions)# 绘制多个同步头的位置
def plot_data_mark_mut(data,sync_positions):"""绘制数据,并在 sync_positions 指定的位置添加垂直线标记(如同步头位置)参数:data (list or np.ndarray): 要绘制的数据sync_positions (list of int): 需要标记的位置索引列表,例如 [150, 450]"""plt.figure(figsize=(12, 6))  # 设置图表大小plt.plot(data, marker='o', markersize=3, linewidth=1, label='Signal')  # 绘制数据for pos in sync_positions:plt.axvline(x=pos, color='red', linestyle='--', linewidth=2, alpha=0.8)plt.axvline(x=pos+30, color='red', linestyle='--', linewidth=2, alpha=0.8)plt.title('Data Plot with Sync Markers')  # 添加标题plt.xlabel('Sample Index')                # X轴标签plt.ylabel('Amplitude')                   # Y轴标签plt.grid(True, alpha=0.6)                 # 显示网格线# 图例:只显示一次 "Sync" 标签handles, labels = plt.gca().get_legend_handles_labels()if handles:plt.legend()# 显示图表plt.tight_layout()plt.show()# 输出数据(可选)print("Data:", data)print("Sync positions marked:", sync_positions)def plot_data_mark_mut_dot(data, sync_positions):"""绘制数据,并在 sync_positions 指定的位置添加垂直线标记(如同步头位置)在第二个红线后以10为距离添加标记,直到下一个第一条红线为止,标记显示对应index"""plt.figure(figsize=(12, 6))  # 设置图表大小plt.plot(data, marker='o', markersize=3, linewidth=1, label='Signal')  # 绘制数据# 先绘制所有红线red_lines = []for pos in sync_positions:# 第一条红线位置first_line = pos# 第二条红线位置second_line = pos + 30plt.axvline(x=first_line, color='red', linestyle='--', linewidth=2, alpha=0.8)plt.axvline(x=second_line, color='red', linestyle='--', linewidth=2, alpha=0.8)red_lines.append((first_line, second_line))# 为每个区间添加浮动文本标记for i, (first_line, second_line) in enumerate(red_lines):# 确定当前区间的结束位置(下一个第一条红线)if i < len(red_lines) - 1:end_pos = red_lines[i+1][0]  # 下一个第一条红线位置else:end_pos = len(data) - 1  # 最后一个区间以数据长度为终点# 从第二条红线后10个位置开始start_pos = second_line + 10# 确保起始位置不超过结束位置if start_pos > end_pos:continue# 以10为间隔添加浮动标记(显示index)current_pos = start_poswhile current_pos <= end_pos:# 获取当前位置的数据值(用于定位文本)data_value = data[current_pos]# 添加浮动文本标记(显示当前index)# 位置:在点的右下方(可通过xytext调整偏移)# 仅第一个标记添加图例标签label = 'Index Mark' if (i == 0 and current_pos == start_pos) else ""if(current_pos in score_dict):plt.text(current_pos, data_value,  # 文本锚点位置(对应数据点)f'{score_dict[current_pos]}',         # 显示的文本内容(index值)fontsize=8,               # 文本大小color='blue',             # 文本颜色ha='left', va='bottom',   # 水平/垂直对齐方式bbox=dict(facecolor='white', alpha=0.3, boxstyle='round,pad=0.3'),  # 白色背景框label=label)# print("----",current_pos)current_pos += 10plt.title('Data Plot with Sync Markers')  # 添加标题plt.xlabel('Sample Index')                # X轴标签plt.ylabel('Amplitude')                   # Y轴标签plt.grid(True, alpha=0.6)                 # 显示网格线# 处理图例,确保只显示唯一标签handles, labels = plt.gca().get_legend_handles_labels()by_label = dict(zip(labels, handles))plt.legend(by_label.values(), by_label.keys())# 显示图表plt.tight_layout()plt.show()# 输出数据(可选)print("Data:", data)print("Sync positions marked:", sync_positions)# ========================
# 数据滤波
# ========================
def lowpass_filter_scipy(data, cutoff_freq, sampling_rate, filter_type='butter', order=4):"""使用SciPy对NumPy数组进行低通滤波参数:data: numpy数组,输入信号cutoff_freq: float, 截止频率 (Hz)sampling_rate: float, 采样频率 (Hz)filter_type: str, 滤波器类型 ('butter', 'cheby1', 'bessel')order: int, 滤波器阶数返回:filtered_data: 滤波后的numpy数组"""# 计算归一化截止频率 (Nyquist频率的倍数)nyquist_freq = 0.5 * sampling_ratenormal_cutoff = cutoff_freq / nyquist_freqif filter_type == 'butter':b, a = signal.butter(order, normal_cutoff, btype='low', analog=False)elif filter_type == 'cheby1':b, a = signal.cheby1(order, 0.5, normal_cutoff, btype='low', analog=False)elif filter_type == 'bessel':b, a = signal.bessel(order, normal_cutoff, btype='low', analog=False)else:raise ValueError("不支持的滤波器类型")# 应用滤波器filtered_data = signal.filtfilt(b, a, data)return filtered_datadef moving_average_filter(data, window_size):"""使用移动平均进行低通滤波参数:data: numpy数组window_size: int, 窗口大小返回:filtered_data: 滤波后的数组"""if window_size <= 1:return data.copy()# 使用卷积实现移动平均window = np.ones(window_size) / window_sizefiltered_data = np.convolve(data, window, mode='same')return filtered_datares_dict = {}
score_dict = {}if __name__ == "__main__":sync_head_num = 0 # 查找到同步头的个数paritiy_num = 0paritiy_list = []not_par_num = 0not_par_list = []data = np.loadtxt('***.dat', skiprows=1) # 加载数据文件,跳过第一行# 预处理数据data = -1*data print(data)startindex = 0data = data[startindex:startindex+49000] # 100000# 0.预处理# data = moving_average_filter(data,2)data = lowpass_filter_scipy(data,40000,100000)# 1. 生成模板template_0, template_1 = generate_manchester_templates()sync_template = generate_sync_template(template_1, 3)# 2. 波形数据test_wave = data# plot_data(test_wave)data_len = data.sizeindex = 0sync_list = [] # 标记同步头位置的列表while(index<(data_len-1000)): # 留1000个缓冲空间# 查找同步头sync_positions =  -1window_size = 300window_overlap = 100while(sync_positions ==-1):index = index + window_size - window_overlap # 窗口重叠segment = test_wave[index:-1]if(segment.size != 0):sync_positions = find_sync_start_correlation(segment, window_size, sync_template, threshold= 55000.0)else:print("已遍历所有数据")breakif(segment.size != 0):   sync_head_num = sync_head_num +1print("第一个同步头的位置",index+sync_positions)    # plot_data_mark(test_wave[0:-1],index+sync_positions) # python 不支持函数重载sync_list.append(index+sync_positions)# 3. 解码位流data = test_wave[index+sync_positions+30:index+sync_positions+30+170] print("+++",index+sync_positions+30)decoded_bits,r_list = decode_bits_from_waveform_r_peaks(data, template_0, template_1) # 解完17个就返回res_dict[index+sync_positions] = decoded_bitsvalue = index+sync_positions # +30+10+0*10score_dict[value] = r_list# 4. 对list数据decoded_bits进行奇偶校验is_valid = even_parity_check(decoded_bits)print(f"奇偶校验{'通过' if is_valid else '未通过'}")if is_valid:paritiy_num = paritiy_num + 1paritiy_list.append(index+sync_positions)else:not_par_num = not_par_num + 1not_par_list.append(index+sync_positions)print("-------------统计信息------------")print("找到同步头",sync_head_num)print("奇偶校验未通过个数",paritiy_num,paritiy_list)print("奇偶校验通过个数",not_par_num,not_par_list)print("---------------------------------")print("-------------绘制同步头位置------------")plot_data_mark_mut_dot(test_wave,sync_list)print("---------------------------------")
http://www.dtcms.com/a/582300.html

相关文章:

  • 中国建设银行河北省门户网站网络营销渠道策略
  • 关于网站整体实现流程有哪些
  • Docker 核心技术原理(2025年演进趋势与生产实践)
  • 网站界面的版式架构做adsense对网站有什么要求
  • 【观察】洗地机销量连续三年全球第一,添可何以“洗”卷全球?
  • 网站制作泉州公司网络服务提供者接到权利人的通知后
  • 在Ubuntu中安装并配置ssh
  • 个人网站开发赚钱方向pc端自适应网站模板
  • React“组件即函数”
  • 黄冈做网站的公司哪家好企业免费自助建站系统
  • 专业的免费网站建设哪家网站留言板模板
  • 【Web SEO】前端性能优化+SEO具体实践手册
  • Unittest接口测试生成报告和日志
  • Vue3 状态管理 + Pinia
  • 2025重新出发!中小物流仓配一体化平台的技术选型建设手记
  • 记录 RTPEngine Dockerfile 的调试过程
  • 阜新网站seo做软装的网站
  • 微信公众号微网站开发中国装修网官方网站
  • 学做预算网站wordpress编辑器
  • 盲盒抽赏小程序爬塔玩法分析:技术实现 + 留存破局,打造长效抽赏生态
  • 基于springboot的学院班级回忆录的设计与实现
  • 重庆专业网站建设公司响应式框架
  • day10(11.7)——leetcode面试经典150
  • 特定网站开发个人主页网站设计
  • flink 1.20 物化表(Materialized Tables)
  • html网站建设流程图只做一页的网站多少钱
  • 百度上能收到的企业名称网站怎么做wordpress theme api
  • asp.net网站开发案例教程做图片模板
  • 大庆门户网站做外贸接私单的网站
  • 企业项目级医院随访系统源码,患者随访管理系统,技术框架:Java+Spring boot,Vue,Ant-Design+MySQL5