当前位置: 首页 > news >正文

【Rust基础】crossbeam带来的阻塞问题

背景

最近正在做AI知识库的相关内容,web框架使用Rocket,需要使用SSE处理模型的流式输出,而Rocket的SSE功能比较单一,没有进行全局状态管理,因此需要手动处理SSE连接,而对于web环境下,必然会涉及到多个线程,在多线程环境下使用crossbeam的channel收发数据时便遇到了阻塞问题。

场景代码

对有问题的代码简化如下:

TextStream! {while let Some(item) = receiver.recv() {// 推送消息}
};

第一个请求过来时,一切正常;而第二个请求过来时,不仅仅是单个接口阻塞,而是整个程序都会阻塞。并且,第二个请求来后,所有的tokio::spawn中的异步块均无法进入。后来重新查看了crossbeam和rocket的文档,明白了导致阻塞的原因:

  • Rocket使用Tokio的异步Runtime,Tokio使用协程而非线程
  • receiver.recv()会阻塞当前线程
    以上两点,导致第二个请求来后,由于receiver.recv()阻塞了当前线程,后续的请求也是跑在同一线程上,而导致整个系统的阻塞。

解决办法:

  1. 使用异步Stream包装receiver,使其以非阻塞的方式运行在Tokio上
  2. 使用Tokio的mpsc的channel,考虑到SSE的单向传输特性,只需要一个消费者向前端发送消息,因此mpsc更合适。

总结

  • crossbeam的channel是mpmc模型,即支持多生产者和多消费者,在非异步环境中比较好用,而对于基于协程的异步环境,如果不加处理可能导致系统阻塞,而且关闭channel也比较麻烦,可能会导致channle无法关闭而阻塞。因此,crossbeam的channel其实更适合逻辑简单且需要高频传递消息的场景。
  • tokio的channel是mpsc模型,即多生产者单消费者,比较适合做SSE推送,也更适合在异步环境中使用。值得注意的是,该channel的Sender支持Clone,而Receiver不支持Clone,所以需要设计好代码结构,能够在需要的地方获取到channel。

相关文章:

  • 大模型-mcp学习
  • 基于Django实现的图书分析大屏系统项目
  • 为什么要做种草商城
  • MAPLE:编码从自我为中心的视频中学习的灵巧机器人操作先验
  • LeetCode之两数之和
  • 驱动-原子操作
  • 《Java 泛型的作用与常见用法详解》
  • 【JavaScript】二十四、JS的执行机制事件循环 + location + navigator + history
  • 做Data+AI的长期主义者,加速全球化战略布局
  • 4月17日复盘
  • Kettle和Canal
  • 【AI论文】Genius:一种用于高级推理的可泛化和纯无监督的自我训练框架
  • 使用FastAPI构建高效、优雅的RESTful API
  • 基于ssh密钥访问远程Linux
  • AI 数字短视频数字人源码开发的多元价值与深远意义​
  • 网络417 路由转发2 防火墙
  • 常见的VLAN划分方式和示例场景
  • [250417] Fedora 42 正式发布,搭载 Linux 6.14 内核和 GNOME 48 桌面环境
  • 扫雷-C语言版
  • 使用Qt multimedia模块实现简易的视频播放器
  • 辽宁省委书记郝鹏、省长王新伟赶到辽阳火灾事故现场指导善后处置工作
  • 上海科创的三种品格
  • 西湖大学本科招生新增三省两市,首次面向上海招生
  • 专访丨青年作家杜梨:以动物的视角去观察这个世界
  • 伊朗港口爆炸死亡人数升至70人
  • 伊朗内政部长:港口爆炸由于“疏忽”和未遵守安全规定造成