当前位置: 首页 > news >正文

【最后203篇系列】015 几种消息队列的思考

背景

队列还是非常重要的中间件,可以帮助我们:提高处理效率、完成更复杂的处理流程

最初,我觉得只要掌握一种消息队列就够了,现在想想挺好笑的。

过去的探索

因为我用python,而rabbitmq比较贴合快速和复杂的数据处理,然后就选了这款。一开始我觉得不靠谱,服务老是断了,后来发现,是因为ubuntu做服务器如果用wifi的话天然容易这样。当时不明白为什么,所以弃用了rabbitmq一段时间。

后来再次使用时,功能倒是大体ok了,在早期版本好像直接支持延时消息啥的,当时都把插件装上搞好了。后来又觉得rabbitmq的并发太小了,在大规模etl的时候比较吃不消。所以又弃用了。

再后来,我想搞一些简单,使用一点的队列,于是搞了redis stream。在某些方面的确也还挺好用,速度和吞吐都还比较大。但是是吃内存的,而且在后来也出过一次问题:在维持十几个队列进行大量流转时,我发现有些队列失效了,要删除才能释放内存。这样的隐患看起来更大,所以又多少被打入冷宫。

兜兜转转又回到了kafka。这个是最早我比较排斥的,因为看起来比价麻烦,而且更偏向java。但搭起来的确很好用,吞吐大,也不需要内存,主要依赖硬盘。但是kafka搭起来稍微麻烦一点,组要zookeeper,而且对网络和机器资源的要求很高。有一次,我不确定是不是kafka的问题,把我一台机器的网络整个带崩了。当然大部分时候也是没问题的。

后来也陆续看了一些队列,但类型也就是上面三种了,差异不大。还有个zmq,感觉是更底层的队列转发,没去管了。

一些对应的结论:

  • 1 rabbitmq 可以在稍微小的场景用,功能可以比较全面,方便重试等
  • 2 redis 把持久化去掉,不需要了,这样万一stream出问题重启服务就行
  • 3 kafka 适合保存大量的对话日志,有7天滚动删除,需要用资源稍微高一点的机器运行

新的思考

最近碰到一些问题,又触发了我关于这方面的思考。

问题1:一个同事抱怨请求的微服务失败率过高,任务失败后他的重试比较难搞?

问题2:机器人会有很多零散的数据需要向量化,而现有的向量化微服务是处理批量的,这样导致了能力无法输出?

问题3:大模型不断出新的模型,以及现有的接口价格还是稍贵(虽然已经是业内最低),如何能确保替换?

对于问题1,后来我发现还是数据连接失活的问题,已经解决掉了。但如果是大模型接口不稳定导致的问题,应该如何解决呢? – RabbitMQ

由于调用大模型处理的需求一般都是比较昂贵且缓慢的,这意味着天然的并发就不会太高。RabbitMQ即使在消息体很大的情况下,应该也能做到2000左右的并发(这个后续我可以压一下),那样在并发处理上就够了。

然后利用rabbitmq本身丰富的机制,比如死信队列这种来完成重试。
在这里插入图片描述
这样可以应对接口的不稳定调用情况,减少我们自己进行失败的检查和调度。

对于问题2,应该就是做一个微批次服务了。服务端用队列接收请求,只有到一定批次时或者到指定轮询时间(1s),服务才会处理队列的数据,此时就可以发挥服务的批量处理效率了。这对于矩阵处理类的服务特别有效,使用redis stream这样简单的队列来完成这种服务正好。 – Redis Stream。

对于问题3,那么就是一个广播的过程。用kafka比较合适,一方面可以支持很大的吞吐,然后对于不同的消费者,这时应该是不同的模型都可以重复消费。每一个input,只存一次,在kafka,然后可以被重复消费,消费的结果进行实时比对。胜利的模型上台,失败的模型退位。

还有一个比较让我本能抗拒的问题,但其实应该是可以的,后面我也要尝试。

【实时队列服务】服务只是一个消息入口,并不直接处理,而是发到kafka。然后由多个worker盯着kafka进行消费。

这种间接服务是有点不靠谱的,抛开入口服务不谈,这里有kafka队列和worker两个不稳定因素。但如果可行的话,这样反而是比较好的:

  • 1 数据存在历史(7天缓存),必要的时候可以追溯和回放
  • 2 数据存到kafka,可以有更高的弹性处理能力,对那些延时要求不高的,比如允许timeout 30秒的任务来说肯定是可以的

worker处理完之后进行返回 ,可以采用webhook, websocket或者sse的方式将结果实时的返给请求。

【复杂ETL流转】将数据的处理抽象为在若干个kafka之间进行流转。

这样最大的好处是可以让不同人/流程之间的交互变的简单,可能会稍微费点硬盘,但应该是值得的。

相关文章:

  • ORA-00600错误的深度剖析:如何避免与解决?
  • 蓝桥杯宝石,考察数学。考察公式推导能力
  • 设计模式(行为型)-命令模式
  • 【MySQL】MySQL数据存储机制之存储引擎
  • Vim 编辑器-实现基础跳转
  • MCP 开放协议
  • 55-交换机堆叠
  • P4924 [1007] 魔法少女小Scarlet
  • 模板初阶:
  • 判断一个数是否是质数(素数)
  • 【递归与动态规划(DP) C/C++】(1)递归 与 动态规划(DP)
  • 图书管理借阅系统(豪华版)
  • python二级每日十题(1)
  • css盒子模型第二章(margin padding border content)
  • 从零开始 | C语言基础刷题DAY3
  • 深入解析Hosts文件:从原理到实战应用(文末附Qwins下载)
  • Couldn‘t install PSEXESVC service: 拒绝访问。
  • 宇树科技纯技能要求总结
  • 大话数据结构第一章,数据结构绪论笔记
  • 项目实战系列:基于瑞萨RA6M5构建多节点OTA升级-系统设计<一>
  • 秘鲁总理辞职
  • 沙青青评《通勤梦魇》︱“人机组合”的通勤之路
  • “11+2”复式票,宝山购彩者领走大乐透1170万头奖
  • 一手实测深夜发布的世界首个设计Agent - Lovart。
  • 国产水陆两栖大飞机AG600批产首架机完成总装下线
  • 茅台回应“茅台1935脱离千元价位带竞争”:愿与兄弟酒企共同培育理性消费生态