当前位置: 首页 > news >正文

go 使用rabbitMQ

为了简单,我们使用docker 容器开启rabbitmq作为服务

1 安装centos docker

1. 卸载旧版本(如有)

sudo yum remove -y docker \docker-client \docker-client-latest \docker-common \docker-latest \docker-latest-logrotate \docker-logrotate \docker-engine

2. 安装依赖包

sudo yum install -y yum-utils \device-mapper-persistent-data \lvm2

3. 添加 Docker 官方 YUM 仓库

sudo yum-config-manager \--add-repo \https://download.docker.com/linux/centos/docker-ce.repo

如果你访问国外镜像很慢,可以使用国内镜像,例如阿里

  1. 修改 Docker 仓库地址为腾讯镜像
sudo sed -i 's+https://download.docker.com+https://mirrors.tencent.com/docker-ce+' /etc/yum.repos.d/docker-ce.repo
  1. 验证是否替换成功
cat /etc/yum.repos.d/docker-ce.repo```
  1. 清理缓存并重新安装
# 清理旧缓存
sudo yum clean all# 生成新的元数据缓存(缓存所有的yum元数据,时间很长,很慢)
sudo yum makecache# 缓存指定的yum源,推荐
# --disablerepo=*:禁用所有仓库
# --enablerepo=docker-ce-stable:只启用 docker-ce-stable
# --skip-unavailable:跳过无法访问的仓库(避免报错)
sudo  yum makecache --enablerepo=docker-ce-stable --disablerepo=* --skip-unavailable# 安装 Docker
sudo yum install -y docker-ce docker-ce-cli containerd.io

4. 安装 Docker Engine

sudo yum install -y docker-ce docker-ce-cli containerd.io

5. 启动并启用 Docker 服务

# 启动 Docker
sudo systemctl start docker# 设置开机自启
sudo systemctl enable docker

6. 验证安装

sudo docker --version

运行docker rabbitmq

设置国内镜像

需要国内镜像
修改 Docker 配置文件(推荐)
很多镜像无法使用,的用心找下,不然也无法下载镜像

# 1. 创建或编辑配置文件
sudo mkdir -p /etc/docker
sudo tee /etc/docker/daemon.json <<-'EOF'
{"registry-mirrors": ["https://docker-0.unsee.tech"]
}
EOF
# 重新加载配置
sudo systemctl daemon-reload# 重启 Docker
sudo systemctl restart docker# 查看是否生效
docker info | grep -A 2 "Registry Mirrors"

运行rabbitMQ

如果执行不成功,无法下载,则可能你没有公共的DNS,添加
两种方式
方式1,vi打开 /etc/resolv.conf

search lan
nameserver 192.168.111.1
nameserver 8.8.8.8 
nameserver 114.114.114.114

方式2

# 编辑 resolv.conf,添加公共 DNS
sudo tee /etc/resolv.conf <<-'EOF'
nameserver 8.8.8.8
nameserver 114.114.114.114
options timeout:1
EOF
docker run -d \--name rabbitmq \--hostname rabbitmq \-p 5672:5672 \-p 5671:5671 \-p 15672:15672 \-p 15671:15671 \-p 15692:15692 \-e RABBITMQ_DEFAULT_USER=admin \-e RABBITMQ_DEFAULT_PASS=SecurePass123! \-e RABBITMQ_DEFAULT_VHOST=my_vhost \--mount source=rabbitmq_data,target=/var/lib/rabbitmq \--mount source=rabbitmq_log,target=/var/log/rabbitmq \--restart=unless-stopped \rabbitmq:3.13-management

启动成功查看
在这里插入图片描述
访问

http://192.168.126.3:15672/#/
admin
SecurePass123!

go代码实现

五种消息模型

简单队列:点对点

‌特点‌:一个生产者发送消息到队列,仅有一个消费者接收并处理消息。 ‌
‌适用场景‌:一对一的简单通信场景。

下面的代码只有一个生产者,一个消费者

package mainimport ("log""strconv""sync""time""github.com/streadway/amqp"
)func main() {// 1. 连接 RabbitMQconn, err := amqp.Dial("amqp://admin:SecurePass123!@192.168.126.3:5672/my_vhost")if err != nil {log.Fatalf("Failed to connect to RabbitMQ: %v", err)}defer conn.Close()// 2. 创建 channelch, err := conn.Channel()if err != nil {log.Fatalf("Failed to open a channel: %v", err)}defer ch.Close()// 3. 声明队列(确保队列存在)queue, err := ch.QueueDeclare("Q1",  // 队列名true,  // 持久化:重启后队列仍存在false, // 自动删除:当最后一个消费者断开时,不自动删除false, // 排他性:非排他,其他连接也可使用false, // 阻塞:不阻塞nil,   // 额外参数)if err != nil {log.Fatalf("Failed to declare a queue: %v", err)}// 4. 启动多个消费者(使用 WaitGroup 等待)var wg sync.WaitGroupwg.Add(1) // 启动 1 个消费者go func() {defer wg.Done()consume("consumer-1", conn) // 每个消费者使用自己的 channel}()// 5. 生产者:发送消息go func() {i := 0for {str := "Hello World" + strconv.Itoa(i)err := ch.Publish("",         // 交换机:默认交换机queue.Name, // 路由键:队列名false,      // mandatory:如果没有匹配的队列,不返回消息false,      // immediate:不立即投递amqp.Publishing{ContentType: "text/plain",Body:        []byte(str),},)if err != nil {log.Printf("Failed to publish a message: %v", err)return}i++time.Sleep(200 * time.Millisecond)}}()// 6. 阻塞主函数,等待消费者完成(实际上消费者会一直运行)wg.Wait() // 这里不会退出,除非消费者退出
}// consume 消费者函数
func consume(consumerName string, conn *amqp.Connection) {// 每个消费者使用独立的 channelch, err := conn.Channel()if err != nil {log.Printf("[%s] Failed to open channel: %v", consumerName, err)return}defer ch.Close()// 消费消息msgs, err := ch.Consume("Q1",         // 队列名consumerName, // 消费者标签(唯一标识)true,         // autoAck:自动确认(消息处理完后自动从队列删除)false,        // exclusive:非排他false,        // noLocal:不接收本连接发送的消息(AMQP 未强制支持)false,        // noWait:不等待服务器确认nil,          // args)if err != nil {log.Printf("[%s] Failed to consume: %v", consumerName, err)return}// 持续接收消息for msg := range msgs {// 使用锁或 log 打印,避免并发输出乱序log.Printf("[%s] Received: %s", consumerName, msg.Body)// 模拟处理时间time.Sleep(100 * time.Millisecond)}log.Printf("[%s] Consumer stopped.", consumerName)
}

工作队列模式

‌特点‌:多个消费者共享同一队列,消息按轮询(默认)或公平分配(能者多劳)机制处理。 ‌
‌适用场景‌:任务分发与负载均衡,如分布式任务处理

仅仅是在简单队列基础上,增加了一个消费队列,两个队列轮流消费

	go func() {defer wg.Done()consume("consumer-1", conn) // 每个消费者使用自己的 channel}()go func() {defer wg.Done()consume("consumer-2", conn)}()

完整版

package mainimport ("log""strconv""sync""time""github.com/streadway/amqp"
)func main() {// 1. 连接 RabbitMQconn, err := amqp.Dial("amqp://admin:SecurePass123!@192.168.126.3:5672/my_vhost")if err != nil {log.Fatalf("Failed to connect to RabbitMQ: %v", err)}defer conn.Close()// 2. 创建 channelch, err := conn.Channel()if err != nil {log.Fatalf("Failed to open a channel: %v", err)}defer ch.Close()// 3. 声明队列(确保队列存在)queue, err := ch.QueueDeclare("Q1",  // 队列名true,  // 持久化:重启后队列仍存在false, // 自动删除:当最后一个消费者断开时,不自动删除false, // 排他性:非排他,其他连接也可使用false, // 阻塞:不阻塞nil,   // 额外参数)if err != nil {log.Fatalf("Failed to declare a queue: %v", err)}// 4. 启动多个消费者(使用 WaitGroup 等待)var wg sync.WaitGroupwg.Add(2) // 启动 2 个消费者go func() {defer wg.Done()consume("consumer-1", conn) // 每个消费者使用自己的 channel}()go func() {defer wg.Done()consume("consumer-2", conn)}()// 5. 生产者:发送消息go func() {i := 0for {str := "Hello World" + strconv.Itoa(i)err := ch.Publish("",         // 交换机:默认交换机queue.Name, // 路由键:队列名false,      // mandatory:如果没有匹配的队列,不返回消息false,      // immediate:不立即投递amqp.Publishing{ContentType: "text/plain",Body:        []byte(str),},)if err != nil {log.Printf("Failed to publish a message: %v", err)return}i++time.Sleep(200 * time.Millisecond)}}()// 6. 阻塞主函数,等待消费者完成(实际上消费者会一直运行)wg.Wait() // 这里不会退出,除非消费者退出
}// consume 消费者函数
func consume(consumerName string, conn *amqp.Connection) {// 每个消费者使用独立的 channelch, err := conn.Channel()if err != nil {log.Printf("[%s] Failed to open channel: %v", consumerName, err)return}defer ch.Close()// 消费消息msgs, err := ch.Consume("Q1",         // 队列名consumerName, // 消费者标签(唯一标识)true,         // autoAck:自动确认(消息处理完后自动从队列删除)false,        // exclusive:非排他false,        // noLocal:不接收本连接发送的消息(AMQP 未强制支持)false,        // noWait:不等待服务器确认nil,          // args)if err != nil {log.Printf("[%s] Failed to consume: %v", consumerName, err)return}// 持续接收消息for msg := range msgs {// 使用锁或 log 打印,避免并发输出乱序log.Printf("[%s] Received: %s", consumerName, msg.Body)// 模拟处理时间time.Sleep(100 * time.Millisecond)}log.Printf("[%s] Consumer stopped.", consumerName)
}

发布/订阅模式

‌特点‌:生产者通过交换机将消息广播至所有绑定该交换机的队列,每个队列对应一个消费者。 ‌
‌适用场景‌:消息多副本分发,如日志同步。

fanout 模式

package mainimport ("log""strconv""sync""time""github.com/streadway/amqp"
)const exchangeName = "go-exchange"func main() {// 1. 连接 RabbitMQconn, err := amqp.Dial("amqp://admin:SecurePass123!@192.168.126.3:5672/my_vhost")if err != nil {log.Fatalf("Failed to connect to RabbitMQ: %v", err)}defer conn.Close()// 2. 创建 channelch, err := conn.Channel()if err != nil {log.Fatalf("Failed to open a channel: %v", err)}defer ch.Close()err = ch.ExchangeDeclare(exchangeName, // 交换机名:默认交换机"fanout",     // 类型:直连 direct,  topic , headers, fanouttrue,         // 持久化:重启后交换机仍存在false,        // 自动删除:当最后一个绑定断开时,不自动删除false,        // 内部:不用于客户端应用false,        // 无等待:不等待服务器确认nil,          // 额外参数)if err != nil {log.Fatalf("Failed to declare an exchange: %v", err)}// 4.var wg sync.WaitGroupwg.Add(2) // 启动 2 个消费者go func() {defer wg.Done()subscribe(conn, exchangeName) // 每个消费者使用自己的 channel}()go func() {defer wg.Done()subscribe(conn, exchangeName) // 每个消费者使用自己的 channel}()// 5. 生产者:发送消息go func() {i := 0for {str := "Hello World" + strconv.Itoa(i)err := ch.Publish(exchangeName, // 交换机:默认交换机"",           //  routing key:路由键,fanout 类型不需要false,        // mandatory:如果没有匹配的队列,不返回消息false,        // immediate:不立即投递amqp.Publishing{ContentType: "text/plain",Body:        []byte(str),},)if err != nil {log.Printf("Failed to publish a message: %v", err)return}i++time.Sleep(200 * time.Millisecond)}}()// 6. 阻塞主函数,等待消费者完成(实际上消费者会一直运行)wg.Wait() // 这里不会退出,除非消费者退出
}func subscribe(conn *amqp.Connection, exchangeName string) {// 每个消费者使用独立的 channelch, err := conn.Channel()if err != nil {log.Printf("Failed to open channel: %v", err)return}defer ch.Close()// 声明队列(确保队列存在)queue, err := ch.QueueDeclare("",    //  队列名false, // 持久化:重启后队列仍存在true,  // 自动删除:当最后一个消费者断开时,不自动删除true,  // 排他性:非排他,其他连接也可使用false, // 阻塞:不阻塞nil,   // 额外参数)if err != nil {log.Printf("Failed to declare a queue: %v", err)return}// 绑定队列到交换机err = ch.QueueBind(queue.Name,   // 队列名"",           // 路由键:fanout 类型不需要exchangeName, // 交换机名false,        // 不等待服务器确认nil,          // 额外参数)defer ch.QueueDelete(queue.Name, false, false, false)//调用消费者consume("consumer-haha", conn, queue.Name)}// consume 消费者函数
func consume(consumerName string, conn *amqp.Connection, queueName string) {// 每个消费者使用独立的 channelch, err := conn.Channel()if err != nil {log.Printf("[%s] Failed to open channel: %v", consumerName, err)return}defer ch.Close()// 消费消息msgs, err := ch.Consume(queueName,    // 队列名consumerName, // 消费者标签(唯一标识)true,         // autoAck:自动确认(消息处理完后自动从队列删除)false,        // exclusive:非排他false,        // noLocal:不接收本连接发送的消息(AMQP 未强制支持)false,        // noWait:不等待服务器确认nil,          // args)if err != nil {log.Printf("[%s] Failed to consume: %v", consumerName, err)return}// 持续接收消息for msg := range msgs {// 使用锁或 log 打印,避免并发输出乱序log.Printf("[%s] Received: %s", consumerName, msg.Body)// 模拟处理时间time.Sleep(100 * time.Millisecond)}log.Printf("[%s] Consumer stopped.", consumerName)
}

每次获取两条消息,这是因为有两个subscribe()携程在收消息
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

http://www.dtcms.com/a/357298.html

相关文章:

  • 神经网络|(十六)概率论基础知识-伽马函数·中
  • Hugging Face入门指南:AI创客的数字游乐场
  • 解析json
  • LeetCode 142.环形链表 II
  • 【前端教程】JavaScript 数组对象遍历与数据展示实战
  • 动态规划01背包
  • 解锁Libvio访问异常:从故障到修复的全攻略
  • 从“Where”到“Where + What”:语义多目标跟踪(SMOT)全面解读
  • C# 日志写入loki
  • 海外广告流量套利:为什么需要使用移动代理IP?
  • 接吻数问题:从球体堆叠到高维空间的数学奥秘
  • 告别K8s部署繁琐!用KubeOperator可视化一键搭建生产级集群
  • 玄机靶场 | 冰蝎3.0-jsp流量分析
  • ACID分别如何实现
  • Dockerfile实现java容器构建及项目重启(公网和内网)
  • SOME/IP-SD IPv4组播的通信参数由谁指定?
  • React学习教程,从入门到精通, ReactJS - 特性:初学者的指南(4)
  • C++链表双杰:list与forward_list
  • ElasticSearch对比Solr
  • Node.js 的流(Stream)是什么?有哪些类型?
  • DQL单表查询相关函数
  • STM32F2/F4系列单片机解密和芯片应用介绍
  • Ubuntu虚拟机磁盘空间扩展指南
  • AI视频安防,为幼儿园安全保驾护航
  • 基于 GPT-OSS 的成人自考口语评测 API 开发全记录
  • 深度解密SWAT模型:遥感快速建模、DEM/LU/气象数据不确定性、子流域/坡度划分、未来土地利用与气候变化情景模拟及措施效益评估
  • 龙巍:探究青铜器在木雕中的运用
  • VS Code C#调试完全指南
  • [AI人脸替换] docs | 环境部署指南 | 用户界面解析
  • 红色视频剪辑制作——走进广州农讲所:在红墙黄瓦间感悟初心与传承