当前位置: 首页 > news >正文

从零开始学RabbitMQ:Java实战简单模式与工作队列消息处理

一、导入依赖       

         创建一个新的Java空项目,选择maven类型:

        首先引入依赖,打开maven中央仓库,搜索RabbitMQ,选择稳定且使用较多的版本即可

        大家也可以直接复制如下依赖,直接导入pom文件中,并刷新即可

<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.25.0</version>
</dependency>

        我们新建的是Java空项目,没有任何依赖,导入RabbitMQ依赖,需要手动输入<dependencies>标签

准备工作

        在 linux 系统上部署 Erlang 环境,再安装 RabbitMQ ,再Web界面创建个用户并设置相关权限(后续某天心血来潮会写教程的)——完成上述内容后,那么就开始认识一下RabbitMQ 的七种工作模式以及模拟实现

二、简单模式

        简单模式很好理解,RabbitMQ再其中扮演消息中转站,P把请求发送至消息队列,然后C就从队列中获取。举个栗子🌰:在电子邮件发送流程中,服务器通常会先将待发送文件存入消息队列,然后由邮件服务从队列中取出并发送给目标用户。这种设计让服务器无需等待邮件发送完成就能立即返回响应,将实际的发送工作交由后续服务处理,从而显著提升服务器的处理效率

        那么我们接下来来看,如何使用消息队列:(消费者)

2.1、建立连接

        首先是建立连接,需要设置一下参数:IP、端口号、账号、密码和虚拟主机,如下代码片段:必须保证用户、密码和虚拟机已事先创建好

        ConnectionFactory factory=new ConnectionFactory();factory.setHost("⚠️此处为自己的公网ip⚠️");factory.setPort(5672);factory.setUsername("study");factory.setPassword("study");factory.setVirtualHost("csdn");Connection connection = factory.newConnection();

2.2、创建Channel(开启信道)

        Channel channel = connection.createChannel();

2.3、声明交换机

        此处直接采用RabbitMQ默认提供的交换机即可,不作代码解释

2.4、声明队列

        channel.queueDeclare("aokey",true,false,false,null);

参数说明:

        queue: 队列名称

        durable: 持久化 true 队列设置为持久化,消息会存盘,服务器重启后消息不会丢

        exclusive: 仅允许一个消费者监听该队列,当Connection关闭时,队列将被删除

        autoDelete: 自动删除,当没有消费者时自动删除队列

        arguments: 其他参数配置 

2.5、发送消息

参数说明:

        exchange:交换机名称。在简单模式下,将默认使用空字符串("")

        routingKey:路由键名称,其值等同于队列名称

        props:消息的配置参数

        body:待发送的消息数据

2.6、释放资源

        channel.close();connection.close();

完整代码:

public class SimpleMQProduce {public static void main(String[] args) throws IOException, TimeoutException {//1、建立连接ConnectionFactory factory=new ConnectionFactory();factory.setHost("云服务器IP");factory.setPort(5672);factory.setUsername("study");factory.setPassword("study");factory.setVirtualHost("csdn");Connection connection = factory.newConnection();//2. 创建channel通道Channel channel = connection.createChannel();//3、使用交换机//4、声明队列String str="肖利波是个仁";channel.queueDeclare("aokey",true,false,false,null);channel.basicPublish("","aokey",null,str.getBytes());//5、释放资源channel.close();connection.close();}
}

2.7、运行结果:

        点击进入队列后,可获取消息队列中的信息

消费者:

        代码大体和上述一样,只是消费者是消费元素        

public class SimpleMQConsumer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("云服务器IP");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("aokey");connectionFactory.setUsername("study");connectionFactory.setPassword("study");Connection connection=connectionFactory.newConnection();Channel channel=connection.createChannel();channel.queueDeclare("csdn",true,false,false,null);DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {System.out.println(new String(body));}};channel.basicConsume("csdn",true,consumer);channel.close();connection.close();}
}

参数说明:

        consumerTag:消费者标签,通常由消费者在订阅队列时指定

        envelope:包含消息的封包信息,如队列名称、交换机等

        properties:消息相关配置信息

        body:消息的具体内容

        使用匿名内部类继承DefaultConsumer,重写handleDelivery方法来直接输出消息内容(可按照不同需求完成接收到消息的响应处理)

运行后:

三、工作队列模式

        由图可知:P发送的消息传入队列后,C1和C2都可以接收到,类似于班级群班长发布通知:“下午5点要开个班会,地点...”,全体成员都能看到通知并参与班会

生产者

public class WorkMQProduce {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);Connection connection=connectionFactory.newConnection();Channel channel=connection.createChannel();channel.queueDeclare(Constants.WORK_QUEUE,true,false,false,null);for (int i = 1; i <= 10; i++) {String str="我爱小奥奇💗的第 "+i+" 天";channel.basicPublish("",Constants.WORK_QUEUE,null,str.getBytes());}channel.close();connection.close();}
}

消费者

        (定义两个消费者)

public class WorkMQConsumer2 {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {System.out.println(new String(body));}};channel.basicConsume(Constants.WORK_QUEUE,true,consumer);}
}

        并启动消费者,不进行释放资源,此时消费者处于等待消息的状态,然后启动生产者并观察消费者的接收情况:

        总得来说,工作队列模式和简单模式差不多,只是消费者存在多个,共同消费生产者生产的消息

http://www.dtcms.com/a/418581.html

相关文章:

  • 农家乐网站模板腾讯云电商网站建设
  • 响应式网站用什么软件做效果站酷设计网站官网入口免费个人海报
  • JavaScript中国手机号校验
  • 【OJ】stack 的经典OJ题
  • 算法1.0
  • 语义网络对人工智能自然语言处理中深层语义分析的影响与启示
  • HCTF2018
  • 网站定制设计深圳网络营销优化
  • 西安建站推广做网站为什么选择竞网智赢
  • 学习日报 20250928|Java日志规范:从基础规约到高级实践(含SkyWalking整合)
  • 江协科技 CAN总线入门课程(CAN简介硬件电路)
  • 每周资讯 | 腾讯《三角洲行动》周年庆登双榜TOP1;腾讯首款生活模拟游戏《粒粒的小人国》曝光
  • 做网站 业务流程图网站成立时间
  • 第四部分:VTK常用类详解(第96章 vtkCaptionActor2D标题演员类)
  • 数据可视化 | Violin Plot小提琴图Python实现 数据分布密度可视化科研图表
  • STM32H743-ARM例程11-PWM
  • 网站建设的功能和目标郑州网站建设企业
  • 网站与手机app是一体吗wordpress 中文建站
  • unzip-6.0-21.el7.x86_64.rpm怎么安装?CentOS 7手动安装rpm包详细步骤
  • Go 的切片原理
  • GDAL 的内置矢量工具集ogr的详解使用
  • ppt制作软件模板网站wordpress 邮件投稿
  • Git - git status 观察记录(初始化本地仓库、初始暂存、初始提交、修改文件、第二次暂存、第二次提交)
  • 帝国网站管理系统安装山西省住房和城乡建设部网站
  • 怎么选择昆明网站建设长沙县星沙人才招聘网
  • 【C++】AVL详解
  • SQLE:一个全方位的SQL质量管理平台
  • 基于51单片机智能台灯无线WIFI控制LED灯亮灭亮度APP设计
  • postgres linux 环境psql 中文乱码处理
  • “静态前端 + Serverless API”** 架构做视频站