多通道信号采集分析系统 - 01 功能分解与采样子系统
1.将采样与分析进程分离
注意,永远不要把采样和分析耦合进一个程序,这样做有些非常糟糕的不良后果:
- 更难调试。你始终需要真实数据进行调试。
- demo程序的逻辑更不好处理,你要在解析程序中加入命令行参数,或者使用编译宏分割为不同的程序。
- 故障复原,其他同事的联调也变得麻烦。
- 采样程序有时会涉及三方设备,比如会遭遇设备不在线,或者访问超时。如果放在一起会耦合到一块儿。设计不良时很容易出现,故障不必要的扩散。
- 采样程序可能会涉及到相关IO设备的频繁操作,资源的访问和释放。如果涉及不良会有内存访问问题,隔离到独立的进程会更容易处理。
2.采样程序的设计思路
2.1 工程采样配置文件
根据工程的采样需求,整理采样配置文件,推荐.json或者.xml这种自解释的格式。
-
是否涉及到对多个采集设备的采样。
-
是否涉及对同一个采样设备不同类型的通道的采样。
-
是否涉及对不同采样对象的差异化采样策略【比如saps, ptps的调整】
-
各种不同的传感器型号是否一致,采集参数是否一致。
2.2 采样程序的职能定义
无论项目的采样计划是否涉及到多组,不同配置参数的采集设备的采样。建议采样程序的职能始终设置为,针对单一采样设备,单一采样参数的采样。把整体采样计划的执行,各种原子采样动作的调度,放到采样程序的外部。
简单的固定间距长短周期采样,可以直接指定多组定时任务,比如:
#15秒一次短采样,60秒一组长采样的序列,外加15秒一次的温度采样
watch -n 15 sample_vibration dev="dev01" param="virbation, saps=10000,ptps=2048"
watch -n 15 sample_vibration dev="dev01" param="tempature"
watch -n 60 sample_vibration dev="dev01" param="virbation, saps=10000,ptps=20480"watch -n 15 sample_vibration dev="dev02" param="virbation, saps=10000,ptps=2048"
watch -n 15 sample_vibration dev="dev02" param="tempature"
watch -n 60 sample_vibration dev="dev02" param="virbation, saps=10000,ptps=20480"
采样程序内部,进行资源的互斥访问。
2.3 采样数据的存储格式
2.3.1 存储格式
- 采集时间
- 通道号(注意通道分为采集设备通道,采集通道,总通道这几类,建议冗余记录)
- 振动体编号(电机,减速机等)
- 相对位置信息(输出轴,输入轴等)
- saps
- ptps
- 采样数据类型
- 采样数据单位
- 采样数据本身
- 关联通道(比如HVZ)
- 关联通道标记(H|V|Z)
- 附加信息
2.3.2 存储格式
2.3.2.1 通道部分
- 建议采样数据部分单独二进制压缩存储
- 为了避免可变长采样影响数据库存储,可以将一笔单独采样记录拆分成多组关联记录的模式。
- saps必须存储
- ptps必须存储
- 长采样拆分多组数据帧的关联编号比如:1-9
2.3.2.2 通道配置
可单独建表,与通道数据建立关联。
2.4 采样频率与采样时长以及循环采样周期的考量
- 采样频率完全取决于设备的最高故障频率,然后要结合硬件的防混叠滤波参数的频点,以及重要干扰源的频点。
- 采样时长决定了所能采集到的低频信号的频率,需要对照设备几大传动系统的最终输出轴频率来把握。对于难以覆盖到的某些长周期设备,可以考虑引入相位标记的方式来进行多组数据拼接,来达到最终的视图。然后,对于关键的设备,更久测量可以引入时域平均,并且可能抓取设备的启停瞬态,是更好的采样策略。
- 循环采样周期建议长短周期穿插测量,相对短暂的等周期稳态可以非常迅速地输出测量值,为故障分析提供参考点。然后长周期测量数据可以为后续的故障解算提供支持。然后可以考虑使用物理传感器或者峭度等直接信号处理的方法,将设备的采集分为盲闲时两种工况,避免数据流速过高。
3.推荐的中间件平台
1.mqtt支持数据驱动的多用户订阅,是理想的数据推送-接收平台。此时,数据推送、数据存储、数据接收这些职能可以做到彼此分离。
我第一版实现用了memory map,自行设计了采样数据中转,它在高速通讯时可能会用到,对于这种10秒,1分钟才会批量处理的应用,用不着,增加了不必要的复杂性。
2.内存级别的数据共享可以使用其他的模式,之前使用redis,非常方便,并且可以很容易地实现缓冲。mqtt只是一个通知-接收平台,它不保持数据(虽然可以设置为保存最后一笔数据)。
附录A 一个典型的MQTT推送程序
1.主程序
- 相关项目级的采样配置整体放在.json里。
- 命令行仅传递一个针对特定传感器,特定采样率的一组通道的单采集器采样(多通道)
- 在线程中做这件事,然后线程执行完毕,整个进程终止。
- 输出现在绑定在mqtt服务器上,这个程序会在执行完毕后去刷新特定的键值,把实际采样刷上去。
- 不涉及定时器循环采样。
- 用完所有的资源全部释放。防止出现内存泄露问题。
int main(int argc, char** argv) {const char *FILENAME_OF_CFG_IN_JSON = "xxxx.json";cJSON *jsonCfg = json_read_text_file(FILENAME_OF_CFG_IN_JSON);int idxGroup = -1;//得到所有的参数if(jsonCfg == NULL) exit(-1);if(argc!=2){printf("usage:{%s} <json_sample_group_idxbase0>\n", argv[0]);exit(-1);}sscanf(argv[1], "%d", &idxGroup);parse_cfg_from_json(&gAppConfig, jsonCfg, idxGroup);fflush(stdout);pthread_t thread_id;// 创建线程(关键步骤)int ret = pthread_create(&thread_id, // 存储线程IDNULL, // 默认线程属性mqtt_publish_thread, // 线程入口函数(void*)&gAppConfig // 传递参数给线程);if (ret != 0) {perror("Failed to create thread");return 1;}// 等待线程结束(避免主进程先退出)pthread_join(thread_id, NULL);printf("%s: Done!\n", argv[0]);fflush(stdout);return 0;
}
2.配置读取
- 这部分我应该做了一些冗余的无用功,因为配置应该能做到单行访问。
- 我内建了一个额外的结构体来暂存配置,以及运行时可以查询的json等信息。
- 那个sampleDesc源自配置文件中定义的mqtt上传payload定义。它也是json格式。
- 因为涉及到第几个采样通道,里面出现了必须通过snprintf()生成的 json.key的操作
- 里面附带了一笔string格式的json配置读取->全局配置的示例。
// 全局配置结构体
typedef struct {//略去sample_device 配置int saps;int ptps;int ch_cnts;int tgt_root_base_idx;int ch_src_list[128];int ch_tgt_list[128];char tgt_mqtt_ip[1024];int tgt_mqtt_port;char tgt_user[128];char tgt_pass[128];char tgt_topic[128]; //has moved finalfix:"\\"char common_root[128]; //has moved prefix:".\\"char resource_lock_root[256]; /*ip*/ cJSON *sampleDesc;
} AppConfig, *LPAppConfig;int parse_cfg_from_json(LPAppConfig papp, cJSON *jsonCfg, int idxGroup)
{//得到配置int i = 0;const char topic[1024];printf("sample group idx = %d\n", idxGroup);fflush(stdout);snprintf(topic, 1024, "channels[%d]\\source\\desc\\ip_addr", idxGroup);strncpy(papp->ip, cJSON_GetStringValue(gpJson_GetItemFromPath(jsonCfg, topic)), sizeof(papp->ip));....
3.采集器资源互斥
是通过mqtt上的特定topic和payload实现的,这部分实现有瑕疵,稍后调整。要考虑程序强行退出,并且现在事实上锁不住。只是思路:
// 消息回调函数(需提前注册)
void on_message(struct mosquitto *mosq, void *obj, const struct mosquitto_message *msg) {if (msg->payloadlen == 8 && strncmp(msg->payload, "UNLOCKED", 8) == 0) {lock_acquired = 0; // 检测到解锁信号}
}// 发布确认回调
void on_publish(struct mosquitto *mosq, void *obj, int mid) {if (mid == publish_mid) {lock_acquired = 1; // 确认LOCKED消息发布成功}
}void request_lock(struct mosquitto *mosq, const char* resource_topic, int timeout_ms) {struct timeval start_time, current_time;gettimeofday(&start_time, NULL); // 记录起始时间// 订阅锁主题(QoS 1)mosquitto_subscribe(mosq, &lock_sub_mid, resource_topic, 1);// 发布锁请求(QoS 1)mosquitto_publish(mosq, &publish_mid, resource_topic, strlen("REQUEST"), "REQUEST", 1, false);static int retry_count = 0;bool timeout_occurred = false;while (!lock_acquired && !timeout_occurred) {// 非阻塞网络循环mosquitto_loop(mosq, 100, 1);if (!lock_acquired) {// 指数退避策略避免高频重试usleep(100000 * (1 << (retry_count < 8 ? retry_count++ : 8)));}// 毫秒级超时检测gettimeofday(¤t_time, NULL);long elapsed_ms = (current_time.tv_sec - start_time.tv_sec) * 1000 +(current_time.tv_usec - start_time.tv_usec) / 1000;timeout_occurred = (elapsed_ms >= timeout_ms);}if (timeout_occurred) {//mosquitto_unsubscribe(mosq, NULL, resource_topic); // 超时后解除订阅fprintf(stderr, "Lock request timeout after %d ms\n", timeout_ms);}retry_count = 0; // 重置计数器
}void release_lock(struct mosquitto *mosq, const char* resource_topic) {mosquitto_publish(mosq, &publish_mid, resource_topic, strlen("UNLOCKED"), "UNLOCKED", 1/*qos*/, false);mosquitto_unsubscribe(mosq, &lock_sub_mid, resource_topic);
}
void drop_lock_request(struct mosquitto *mosq, const char* resource_topic) {mosquitto_unsubscribe(mosq, &lock_sub_mid, resource_topic);
}
4.线程主执行体
- 计算设备资源锁mqtt_topic.
- 得到采集设备资源锁
- 采样
- 释放资源锁
- mqtt_post.
// MQTT发布线程
void* mqtt_publish_thread(void* arg) {LPAppConfig papp = (LPAppConfig)arg;const char lock_topic[4096]={0};const char ch_topic_fmt[4096]={0};strncpy(lock_topic, papp->resource_lock_root, sizeof(lock_topic));struct mosquitto *mosq = mosquitto_new(NULL, true, papp); //first params :"lock_client"mosquitto_username_pw_set(mosq, papp->tgt_user, papp->tgt_pass);mosquitto_connect(mosq, papp->tgt_mqtt_ip, papp->tgt_mqtt_port, 60);// 注册回调mosquitto_message_callback_set(mosq, on_message);mosquitto_publish_callback_set(mosq, on_publish);request_lock(mosq, lock_topic, gtimeout_ms);if(!lock_acquired){drop_lock_request(mosq, lock_topic);if(!lock_acquired){printf("{%s:%d}:Lock acquired error! sample procedure dropped!\n", papp->tgt_mqtt_ip, 0);return NULL;}}char timeStart[128];strncpy(timeStart, gptimestr(), sizeof(timeStart));int ret = do_sample(papp->ip, papp->ch_cnts, papp->ch_src_list, papp->saps, papp->ptps, 10*1000); //TIMEOUT:10s//释放锁 - 采样完毕,提前释放release_lock(mosq, lock_topic);if(0 == ret){//papp->ch_chnts sample后变为一次采集的通道总数do_sample_data_post(mosq, papp->tgt_topic, papp->common_root, papp->tgt_root_base_idx, papp->ch_tgt_list, papp->ptps, papp->sampleDesc, timeStart, papp->ch_cnts);}return NULL;
}
5.采集
欠奉
6.数据推送
- 这里我怀疑自己依然做了一些无用功。本身是要修改目标的待post的已知 json模板,应该直接去改json,我仍然做了一级struct映射。
- 基本的考量是用户在实际采样时,可能会根据情况,动态切换采样参数,比如采样时长。
- 把可能改动的结构体名字中主动添加版本信息是这次想到的。你需要可视化你的设计意图。也需要向维护阶段的同事透露你的设计意图。代码本身就能做到。
- fill_sample_data()是个可服用的模块,修改上传通道可以很容易复用。
- 现在的代码结构,还是有一点和mqtt强耦合。
int do_sample_data_post(struct mosquitto *mosq, const char *mqtt_topic_root, const char *data_type, int tgt_idx_base, int *tgt_chIdx, int ptps, cJSON*payloadFmt, const char *timeStart, int chCntOfSample)
{// 模拟数据传输(实际替换为真实采集逻辑)char topic[2048];char dumb_hex[MAX_SAMPLE_PTPS*sizeof(double)+1];int chOfSample = 0;char timeEnd[128];strncpy(timeEnd, gptimestr(), sizeof(timeEnd));for(chOfSample = 0; tgt_chIdx[chOfSample] != -1; chOfSample++){if(mqtt_topic_root[strlen(mqtt_topic_root)-1]=='/') snprintf(topic, 2048, "%s%d/%s", mqtt_topic_root, tgt_idx_base+ tgt_chIdx[chOfSample], data_type);else snprintf(topic, 2048, "%s/%d/%s", mqtt_topic_root, tgt_idx_base+ tgt_chIdx[chOfSample], data_type);printf("mqtt topic : [%s] baseidx=%d srcSampleIdx=%d, tgtIdx=%d\n", topic, tgt_idx_base, chOfSample, tgt_chIdx[chOfSample]);fflush(stdout);KDE_DATASET_SAMPLE_RAW_CH_DESC_V_1_1_20250611 desc;strncpy(desc.debug_info.start, SHORT_TIME_STR(timeStart), 8);strncpy(desc.debug_info.stop, SHORT_TIME_STR(timeEnd), 8);desc.sample_data.ch = tgt_idx_base + tgt_chIdx[chOfSample];desc.sample_data.ch_peer = 0;strncpy(desc.sample_data.sampletime, timeEnd, sizeof(desc.sample_data.sampletime));u16_to_little_endian_hex(dumb_hex, gChRawFrames[chOfSample].binData, ptps);desc.sample_data.sample_data = &dumb_hex[0];desc.sample_desc.aux_info.fields.ch_of_sample = chOfSample;desc.sample_desc.aux_info.fields.ch_of_sample_dev = gAppConfig.ch_src_list[chOfSample];desc.sample_desc.aux_info.fields.chcnt_of_sample = chCntOfSample;desc.sample_desc.ptps = ptps;printf("sample_time:%s\n", desc.sample_data.sampletime);fill_sample_data(payloadFmt, &desc);const char *payload = cJSON_Print(payloadFmt);mosquitto_publish(mosq, NULL, topic, strlen(payload), payload, 1, true); usleep(1000);}
}