当前位置: 首页 > news >正文

rabbitmq简介与基本使用

rabbitmq简介与基本使用

RabbitMQ 是轻量级的,易于在本地和云端部署。它支持多种消息传递协议。RabbitMQ 可以部署在分布式和联合配置中,以满足大规模、高可用性的要求。

一、RabbitMQ的结构

rabbitmq遵循AMQP协议。

image-20240505225206054

  • Broker:接收和分发消息的应用,RabbitMQ就是MessageBroker
  • Virtual Host:虚拟Broker,将多个单元隔离开
  • Connection: publisher/consumer和broker之间的tcp连接
  • Channel:connection内部建立的逻辑连接,通常每个线程创建单独的channel
  • Routing Key: 路由键,用来指示消息的路由转发,相当于快递的地址
  • Exchange:交换机,相当于快递的分拨中心
  • Queue: 队列,消息最终被送到这里等待consumer取走
  • Binding:exchange和queue之间的虚拟连接,用于message的分发依据

Exhange是AMQP协议和rabbitmq的核心组件

  • Exchange的功能是根据绑定关系和路由键为消息提供路由,将消息转发值相应的队列
  • exchange有4种类型:Direct/Topic/Fanout/Heders,其中Headers使用很少,以前三种为主
  1. Direct(直接路由):Routing Key =Binding Key,容易配置使用
  2. Fanout(广播路由):群发绑定的所有队列,使用与消息广播
  3. Topic(话题路由):功能较为复杂,但使用灵活,建议优先使用,为以后拓展留余地。

二、应用场景

1、流量削峰

举个例子,如果订单系统最多能处理一万次订单,这个处理能力应付正常时段的下单时绰绰有余,正常时段下单一秒后就能返回结果。但是在高峰期,如果有两万次下单操作系统是处理不了的,只能限制订单超过一万后不允许用户下单。使用消息队列做缓冲,我们可以取消这个限制,把一秒内下的订单分散成一段时间来处理,这时有些用户可能在下单十几秒后才能收到下单成功的操作,但是比不能下单的体验要好。

2.应用解耦

以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单操作异常。当转变成基于消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。在这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户的下单操作可以正常完成。当物流系统恢复后,继续处理订单信息即可,中单用户感受不到物流系统的故障,提升系统的可用性。

image-20240505230523883

3.异步处理

有些服务间调用是异步的,例如 A 调用 B,B 需要花费很长时间执行,但是 A 需要知道 B 什么时候可以执行完,以前一般有两种方式,A 过一段时间去调用 B 的查询 api 查询。或者 A 提供一个callback api,B 执行完之后调用 api 通知 A 服务。这两种方式都不是很优雅,使用消息总线,可以很方便解决这个问题,A 调用 B 服务后,只需要监听 B 处理完成的消息,当 B 处理完成后,会发送一条消息给 MQ,MQ 会将此消息转发给 A 服务。这样 A 服务既不用循环调用 B 的查询 api,也不用提供 callback api。同样 B 服务也不用做这些操作。A 服务还能及时的得到异步处理成功的消息。

image-20240505230546139

三、基本使用

3.1 生产者

static String QUEUE = "restaurant";static String EXCHANGE = "exchange.order.restaurant";@Testpublic void mqTestConnect() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");factory.setHost("192.168.6.131");factory.setPort(5672);try (Connection conn = factory.newConnection();Channel channel = conn.openChannel().get()){/*** 生成一个交换机* 1.交换机名称* 2.交换机里面的消息是否持久化 默认消息存储在内存中* 3.是否自动删除 如果服务器在不再使用交换机时删除该交换机* 4.其他参数*/channel.exchangeDeclare(EXCHANGE,BuiltinExchangeType.DIRECT,true,false,null);/*** 生成一个队列* 1.队列名称* 2.队列里面的消息是否持久化 默认消息存储在内存中* 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除* 5.其他参数*/channel.queueDeclare(QUEUE, false, false, false, null);/*** 绑定交换机与队列*/channel.queueBind(QUEUE,EXCHANGE,"key.order");String payload = "msg";/*** 发送一个消息* 1.发送到那个交换机* 2.路由的 key 是哪个* 3.其他的参数信息* 4.发送消息的消息体*/channel.basicPublish(EXCHANGE,"key.order",null,payload.getBytes());}}

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

rabbitmq控制台就会有对应的交换机与队列以及消息

image-20240505233402453

绑定信息

image-20240505233602274

消息

image-20240505233636306

3.2 消费者代码

    @Testpublic void mqConsumer() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");factory.setHost("192.168.6.131");factory.setPort(5672);try (Connection conn = factory.newConnection();Channel channel = conn.openChannel().get()){log.info("start listening message...");/*** 消费者消费消息* 1.消费哪个队列* 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答* 3.消费者未成功消费的回调*/channel.basicConsume(QUEUE, deliverCallback, consumerTag -> {log.info("消息消费被中断");});while (true) {Thread.sleep(100000);}} catch (InterruptedException e) {throw new RuntimeException(e);}}DeliverCallback deliverCallback = ((consumerTag, message) -> {String messageBody = new String(message.getBody());log.info("deliverCallback:messageBody:{}", messageBody);});

运行后,控制台打印

image-20240505233926050

http://www.dtcms.com/a/410913.html

相关文章:

  • LlamaIndex智能体Agents开发全攻略
  • c++如何实现高性能线程安全队列
  • java-Map集合
  • Web开发:ABP框架14——多个实现,一个接口的高端写法
  • Springboot社区集市摆摊管理系统jv9kp(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。
  • 国产手机百花齐放,如何化解“选择困难症”?
  • 百度主机做视频网站怎么样淘宝客是以下哪个网站的会员简称
  • AI 算力加速指南
  • 【財運到】股票期货盯盘助手-自选股界面介绍
  • 架构入门系列:用数学公式估算服务器数量的实战指南
  • Redis02-Ehcache缓存
  • 结合 SSH 22 + 2222 备用端口 + 临时保护 + 长期守护 + 防火墙 的终极一行命令版本
  • 使用虚幻引擎时间轴制作一个弹跳小球
  • 网站推广和精准seo深圳网站设计兴田德润i简介
  • 从比分到直播流畅度:API 在体育观赛中的关键作用
  • JavaScript又忘了,忘了?太正常了!忘了?太正常了!重新上路:
  • 全新一代北斗三号短报文通信SoC芯片在北斗规模应用国际峰会发布
  • 佛山做企业网站的公司专业设计网站有哪些
  • 户用储能微型逆变器计量电表防逆流
  • 通过手动安装本地部署live-torrent (影视搜索,云播客户端)
  • 学做立体书的网站网站怎么做gps定位
  • 【实时Linux实战系列】实时系统的现场变更与灰度发布
  • 做个简单网站大概多少钱it培训机构排名北京
  • Spring Boot 自动配置之 TaskScheduler
  • .NET Framework 3.5官网下载与5种常见故障解决方法
  • nginx的访问控制、用户认证、https
  • 网站建设完整网站如何做图片特效
  • 服装类跟单系统:提升供应链管理效率的利器
  • 基于微信小程序的旅游景点系统【2026最新】
  • 网站建设升级网站开发项目架构