当前位置: 首页 > news >正文

rabbitMQ 的安装和使用

1、ubuntu24.04 安装 rabbitMQ

      sudo apt update
      sudo apt install erlang
      sudo apt install rabbitmq-server

      安装管理插件:sudo rabbitmq-plugins enable rabbitmq_management

        启动服务  sudo systemctl start rabbitmq-server  
        设置开机自启  sudo systemctl enable rabbitmq-server
        检查服务状态  sudo systemctl status rabbitmq-server

    新建用户并授权:

        # 创建新用户(将 your_username 和 your_password替换为你自己的)
        sudo rabbitmqctl add_user your_username your_password
        # 赋予管理员权限
        sudo rabbitmqctl set_user_tags your_username administrator
        # 授予所有权限
        sudo rabbitmqctl set_permissions -p / your_username ".*" ".*" ".*"

    登录管理页面:

            浏览器打开 ​http://ip:15672/   ,使用新建的用户和密码登录           

2、使用demo

          pom.xml中引入:

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.20.0</version>
</dependency>

           生产者:确认消息发送成功的几种级别,可靠级别依次升高

               1、消息写入到缓冲区,也就是以下代码中的实现(不可靠);

               2、消息到达rabbitMQ服务器(较可靠)

                channel.txSelect();
                try {
                    channel.basicPublish("", "my_queue", null, message.getBytes());
                    channel.txCommit(); // 消息确认到达服务器
                    System.out.println("消息已到达服务器");
                } catch (Exception e) {
                    channel.txRollback();
                    System.out.println("消息发送失败");
                }

               3、消息到达rabbitMQ服务器并持久化到磁盘 (高可靠)

            // 持久化队列 + 持久化消息 + 发布者确认
            channel.queueDeclare("my_queue", true, false, false, null);
            channel.confirmSelect();

            channel.basicPublish("", "my_queue", 
                MessageProperties.PERSISTENT_TEXT_PLAIN,
                message.getBytes());

            if (channel.waitForConfirms()) {
                System.out.println("消息已持久化到磁盘");
            }

               4、到达队列并准备好消费(最可靠)

                // 使用 mandatory 标志和 ReturnListener
                channel.addReturnListener((replyCode, replyText, exchange, routingKey, properties, body) -> {
                    System.out.println("消息无法路由到队列: " + replyText);
                });

                channel.basicPublish("", "my_queue", 
                    true, // mandatory = true, 如果无法路由到队列会返回
                    MessageProperties.PERSISTENT_TEXT_PLAIN,
                    message.getBytes());
 

生产者demo:

public class Producer {public static void main(String[] args) throws Exception {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.187.133"); // RabbitMQ服务器地址factory.setPort(5672);        // 默认客户端端口factory.setUsername("rabbit"); // 用户名factory.setPassword("rabbit"); //认密码// 2. 创建连接和信道// 使用try-with-resources确保资源自动关闭try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 3. 声明一个队列。参数:队列名,是否持久化,是否独占,是否自动删除,其他参数channel.queueDeclare("hello", true, false, false, null);// 4. 准备消息String message = "nihao";// 5. 发送消息到默认交换机,路由键为队列名channel.basicPublish("", "hello", null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");}// 6. 连接和通道会自动关闭}
}

        消费者:消费者中需要考虑消费失败、重试、死信队列等场景,收到消息之后如果失败,则将消息的header中添加失败的次数,然后再将消息放入到队列中,再次消费这个消息时判断header中的次数是否超过指定值,如果超过放入到死信队列

        消费者demo:

public class Consumer {public static void main(String[] args) throws Exception {// 1. 创建连接工厂(配置同生产者)ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.187.133");factory.setUsername("rabbit");factory.setPassword("rabbit");// 2. 建立连接和信道Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.basicQos(10); // 消费者端最多存在未提交数据的条数,默认1条// 3. 声明队列(必须与生产者声明的队列一致)channel.queueDeclare("hello", true, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");// 在消息头中记录重试次数Map<String, Object> headers = new HashMap<>();headers.put("retry-count", 0);// 4. 创建回调函数,定义如何处理消息DeliverCallback deliverCallback = (consumerTag, delivery) -> {try {String message = new String(delivery.getBody(), "UTF-8");// 获取消息头中的重试次数Map<String, Object> headersData = delivery.getProperties().getHeaders();Integer retryCount = (Integer) headersData.getOrDefault("retry-count", 0);if (retryCount >= 3) {// 超过最大重试次数,进入死信队列channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);System.out.println("消息重试超过3次,进入死信队列");return;}// 处理消息System.out.println(" [x] Received '" + message + "'");// 手动确认消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);} catch (Exception e) {// 增加重试次数并重新入队AMQP.BasicProperties props = delivery.getProperties();Map<String, Object> newHeaders = new HashMap<>(props.getHeaders());Integer currentRetry = (Integer) newHeaders.getOrDefault("retry-count", 0);newHeaders.put("retry-count", currentRetry + 1);AMQP.BasicProperties newProps = new AMQP.BasicProperties.Builder().headers(newHeaders).build();// 重新发布消息(带有更新后的重试次数)channel.basicPublish("", "hello", newProps, delivery.getBody());channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}};}
}

3、rabbitMQ的结构

        从以上代码中可以看出,生产者、消费者连接rabbitMQ后生成一个connection,里面有一个channel,使用channel往一个队列中写、读数据,没有kafka中的topic、tag等;

        当多个消费者消费某个队列的消息时,服务端使用轮询将消息推送到多个消费者,消费者是被动响应的,不需要向服务端发起拉消息的请求;

4、rabbitMQ 的集群模式

        主备:也就是一主多从

        镜像模式:多主多从,也是最常用的模式

        从节点只是用于备份数据和选举产生主节点,所有的客户端的读写操作都在主节点队列上;

        factory.setHost("192.168.187.133"); 中可以设置多个ip,客户端顺序连接对应的服务端节点,连接成功为止,客户端和服务端都只会和其中一个ip建立连接;

        当主节点挂了,从节点选举的优先级是 和主节点数据是否完全同步、节点加入集群的先后顺序;

5、延时消息

      rabbitMQ可以通过死信交换机实现延时 消息,代码不做说明,知道有这个事就行;

6、心跳机制:

       生产者和消费者和服务端通过心跳确认节点状态,当生成者、消费者发送心跳后一段时间没有收到返回 会认为连接的节点宕机,重新换一个节点连接;当服务端没有收到消费者节点的心跳,认为这个消费者不可用触发重平衡,不再向这个消费者推送消息;

7、应用特点

      概念上的轻量化:没有kafka和rocketMQ 中的topic、tag,topic下面有多个分区或队列,结构简单;

      灵活路由:需要给某个业务发消息直接将数据写入到对应队列即可,强调在队列上的灵活读写

http://www.dtcms.com/a/511444.html

相关文章:

  • 华为Java专业级科目一通过心得
  • [Android] AutoCMD+ v.1.3.5:安卓自动化工具
  • 从养殖场到实验室:小吉快检BL-08plus如何实现禽病检测效率提升300%?——真实案例深度解析
  • 衡阳手机网站建设外发加工费计入什么科目
  • 【JUnit实战3_06】第三章:JUnit 的体系结构(下)
  • 使用injected Provider在remix中调试合约的坑 -- 时间(或者最新块)更新不及时
  • 丽水市莲都建设分局网站湖南微网站开发
  • 笔试-最小组合数
  • Web UI自动化时,通过autoIT的解决window控件
  • 电商网站建设建议网站前端交互功能案例分析
  • Qt——窗口
  • [人工智能-大模型-20]:对比 Copilot 与国产替代方案(如通义灵码、百度Comate)
  • c语言和网站建设的关系平台网站开发可行性分析
  • gcc编译的过程及每个过程的作用
  • ROS2[Humble] -- URDF Tutorial- 02-multipleshapes
  • C#实现二维码和条形码生成与打印
  • C#WPF如何跳转页面
  • 【高并发服务器】八、Poller描述符监控类实现
  • 用vs2013网站开发四川最好的网络优化公司
  • 如何开发一个 IDEA 插件通过 Ollama 调用大模型为方法生成仙侠风格的注释
  • 【论文精读】Latent-Shift:基于时间偏移模块的高效文本生成视频技术
  • unity基础学习笔记<上>
  • C# WPF Dragablz使用记录 TabControl选项卡可拖拽为单独界面或停靠
  • 机器人场景落地步入技术验证阶段,微美全息加快创新势能探索AI多元路径变革
  • YOLOv4 核心内容笔记
  • 网站开发工程师待遇家庭网站建设
  • 医疗门户网站模板wordpress3.8
  • iOS的多线程下数据安全和内存泄漏以及工具使用监测内存泄漏
  • 『CMake』关于使用CMake构建项目时的现代/传统指令
  • 请被人做网站怎么做倒计时网站