当前位置: 首页 > wzjs >正文

比特币简易网站开发搜索指数在线查询

比特币简易网站开发,搜索指数在线查询,wordpress oneup,中国人才网背景 我们正在使用Rust开发基于RAG的知识库系统,其中对于模型的回复使用了常用的SSE,Web框架使用Rocket,Rocket提供了一个简单的方式支持SSE,但没有会话保持、会话恢复等功能,因此我们自己简单实现这两个功能。 使用R…

背景

我们正在使用Rust开发基于RAG的知识库系统,其中对于模型的回复使用了常用的SSE,Web框架使用Rocket,Rocket提供了一个简单的方式支持SSE,但没有会话保持、会话恢复等功能,因此我们自己简单实现这两个功能。

在这里插入图片描述

使用Rocket推送消息流

以下,是Rocket给出的示例:

#[get("/text/stream")]
fn stream() -> TextStream![String] {TextStream! {let mut interval = time::interval(Duration::from_secs(1));for i in 0..10 {yield format!("n: {}", i);interval.tick().await;}}
}

我们需要改造这个示例,以满足将模型回复的消息推送给前端的需求。

首先,对于既然推流,需要知道将流推送给谁,也就是要推送到哪个会话中,所以我们在发起会话的时候,需要一个会话ID来标识一个唯一的会话。

我们使用sse.js这个库作为SSE的客户端,用于发起SSE连接,该库可以通过发起POST请求来建立连接,可以携带额外的数据和请求头。

使用以下结构来接收一个对话请求:

pub struct ChatMessageReq {/// 会话IDpub session_id: String,/// 消息内容pub content: String,
}

于是我们的接口需要修改为这样:

#[post("/chat", data = "<req>")]
async fn question(req: Json<ChatMessageReq>) -> (ContentType, TextStream<impl Stream<Item = String>>) 
{//TODO
}

其中TextStream<impl Stream<Item = String>>等价于TextStream![String]

需要注意的是,如果返回值没有指定ContentType,那么Rocket默认响应的ContentType是文本类型,而非stream类型,会导致前端无法解析。

接下来我们实现会话管理功能。

我们定义了一个名为SsePool的struct,来存储并管理SSE连接:

struct SsePool {/// 消息流传输通道channel: DashMap<String, Sender<SseEvent>>,
}impl SsePool {/// 初始化连接池pub fn init() -> Self {Self {channel: DashMap::new(),}}/// 移除连接fn remove(&self, id: &String) {if let Some((_, sender)) = self.channel.remove(id) {drop(sender);}}/// 获取连接fn get_sender(&self, id: &String) -> Option<Sender<SseEvent>> {self.channel.get(id).map(|v| v.value().clone())}/// 新建channelpub fn new_channel(&self, id: String) -> (Sender<SseEvent>, Receiver<SseEvent>) {let (sender, receiver) = tokio::sync::mpsc::channel(10_0000);// 获取并移除旧senderlet old_sender = self.channel.remove(&id).map(|(_, s)| s);// 插入新senderself.channel.insert(id, sender.clone());// 处理旧senderif let Some(old_sender) = old_sender {tokio::spawn(async move {// 发送终止信号let _ = old_sender.send(SseEvent::Abort).await;});}(sender, receiver)}/// 发送消息pub async fn send_message(&self, id: &String, message: ChatMessage) {if let Some(sender) = self.get_sender(id) {if let Err(e) = sender.send(SseEvent::ChatMessage(message)).await {log::warn!("消息发送失败,session id: {},失败原因:{}", &id, e);};// drop(sender);}}
}

其中channel使用的是tokio中mpsc的channel。

值得注意的是,new_channel中,新建连接时,需要向channel发送一条终止事件,确保已有的receiver关闭,返回新的receiver,这一点是用于后续的会话恢复使用。new_channel会返回receiver和sender,用于消息接收和发送。

当收到模型回复是,调用SsePool::send_message发送消息到channel,再头通过receiver接收消息,转发到前端。

可以把它初始化到静态变量中,方便全局调用:

static SSE_POOL: LazyLock<SsePool> = LazyLock::new(|| SsePool::init());

于是,我们的接口可以完善为以下内容:

#[post("/chat", data = "<req>")]
async fn question(req: Json<ChatMessageReq>,
) -> (ContentType, TextStream<impl Stream<Item = String>>){// 请求新消息,并返回receiverlet (_, _, mut receiver) = service::new_message(req).await.unwrap();let stream = TextStream! {// 持续接收receiver的消息,然后推送到前端while let Some(item) = receiver.recv().await {match item{//模型回复的消息SseEvent::ChatMessage(message) => {yield SseEvent::ChatMessage(message.clone()).to_message();if SseEvent::is_done(&message) {// 推送消息yield SseEvent::Abort.to_message();break;}},// 关闭通道SseEvent::Abort => {yield SseEvent::Abort.to_message();break;},_ => {}}}yield SseEvent::Abort.to_message();drop(receiver);};(ContentType::EventStream, stream)
}

至此,新会话的接口就完成了。

接下来是会话的恢复。

当前端切换会话或刷新页面时,我们希望能够继续收到未回复完成的消息,所以需要一个用于会话恢复的接口。同样的,接口需要会话ID来区分恢复哪一个会话。

#[post("/resume-stream", data = "<req>")]
async fn resume_stream(req: Json<ResumeStreamReq>,
) -> (ContentType, TextStream<impl Stream<Item = String>>){// 会话IDlet session_id = req.session_id.clone();let stream = TextStream! {// 尝试恢复会话,并返回receiver,如果能够返回receiver说明会话未完成,否则已经完成if let Some(mut receiver) = service::resume_stream(&req.session_id).await.unwrap(){// 持续接收未回复完成的消息while let Some(item) = receiver.recv().await {match item {// 模型回复的消息SseEvent::ChatMessage(message) => {yield SseEvent::ChatMessage(message.clone()).to_message();if SseEvent::is_done(&message) {yield SseEvent::Abort.to_message();break;}}// 关闭通道SseEvent::Abort => {yield SseEvent::Abort.to_message();break;}_ => {}}}drop(receiver);}yield SseEvent::Abort.to_message();};(ContentType::EventStream, stream)
}

service::resume_stream中,首先检查对应会话ID的channel是否存在,存在则新建channel返回receiver,不存在则表明已经回复完成。

pub async fn resume_stream(session_id: &String,
) -> AppResult<Option<Receiver<SseEvent>>> {if let None = chat::get_connection(session_id) {return Ok(None);}// 获取会话对应的channel,如果channel存在则标识消息仍在回复中let (_, receiver) = chat::new_connection(session_id);Ok(Some(receiver))
}

至此,便实现了会话恢复,刷新页面后仍能后接收strem消息。

总结

使用Rust写这些业务代码的速度,终归是没有Java快,一些常用的库,没有Java系列封装的简单易用,不过应用占用资源确实比Java小很多。

本次使用的一些库:

  • tokio:异步运行环境,以及mpsc的channel,
  • dashmap:支持并发的hashmap,但是使用不当容易造成死锁。
http://www.dtcms.com/wzjs/9192.html

相关文章:

  • 杭州 网站建设手机网站关键词快速排名
  • 网站建设钟振森最新国内新闻50条简短
  • 界面做的比较好的网站如何做个网站推广自己产品
  • 外贸英文网站建设价格友情链接发布网
  • 做电锯电音的网站营销型网站建设排名
  • 建设大型网站公司网站搭建
  • 珠海专业网站制作网络舆情管理
  • 网站运营有前途吗怎么样优化网站seo
  • 网站建设投资百度一下 你就知道官方
  • 非凡网站建设最近新闻热点事件
  • 用模板做企业网站杭州网站seo价格
  • 郴州市人口关键词优化推广排名
  • 微信平台开发费用谷歌seo站内优化
  • 大型网站建设兴田德润赞扬搜索引擎平台有哪些
  • wordpress 逻辑表单企业seo服务
  • seo查询 站长之家广州网页seo排名
  • 休闲食品网站建设策划书凡科网
  • 网站百度seo推广怎么做如何创建自己的网站平台
  • 网站建设宣传单网站建设企业建站
  • wordpress观点全面的seo网站优化排名
  • 教学网站线上销售平台如何推广
  • 男女做差差事的视频网站高端网站定制设计
  • 电子商务网站建设大二实训厦门百度代理
  • 开家给别人做网站公司我想接app注册推广单
  • 微信公众号的网站开发百度客服中心
  • 怎么做网站web疫情最新情况 最新消息 全国
  • 企业网站建设的核心是超级外链
  • 网站式的公司记录怎么做反向链接查询
  • 百度seo专业网站平台软件定制开发
  • 做网站单页视频湘潭关键词优化公司