go 使用rabbitMQ
为了简单,我们使用docker 容器开启rabbitmq作为服务
1 安装centos docker
1. 卸载旧版本(如有)
sudo yum remove -y docker \docker-client \docker-client-latest \docker-common \docker-latest \docker-latest-logrotate \docker-logrotate \docker-engine
2. 安装依赖包
sudo yum install -y yum-utils \device-mapper-persistent-data \lvm2
3. 添加 Docker 官方 YUM 仓库
sudo yum-config-manager \--add-repo \https://download.docker.com/linux/centos/docker-ce.repo
如果你访问国外镜像很慢,可以使用国内镜像,例如阿里
- 修改 Docker 仓库地址为腾讯镜像
sudo sed -i 's+https://download.docker.com+https://mirrors.tencent.com/docker-ce+' /etc/yum.repos.d/docker-ce.repo
- 验证是否替换成功
cat /etc/yum.repos.d/docker-ce.repo```
- 清理缓存并重新安装
# 清理旧缓存
sudo yum clean all# 生成新的元数据缓存(缓存所有的yum元数据,时间很长,很慢)
sudo yum makecache# 缓存指定的yum源,推荐
# --disablerepo=*:禁用所有仓库
# --enablerepo=docker-ce-stable:只启用 docker-ce-stable
# --skip-unavailable:跳过无法访问的仓库(避免报错)
sudo yum makecache --enablerepo=docker-ce-stable --disablerepo=* --skip-unavailable# 安装 Docker
sudo yum install -y docker-ce docker-ce-cli containerd.io
4. 安装 Docker Engine
sudo yum install -y docker-ce docker-ce-cli containerd.io
5. 启动并启用 Docker 服务
# 启动 Docker
sudo systemctl start docker# 设置开机自启
sudo systemctl enable docker
6. 验证安装
sudo docker --version
运行docker rabbitmq
设置国内镜像
需要国内镜像
修改 Docker 配置文件(推荐)
很多镜像无法使用,的用心找下,不然也无法下载镜像
# 1. 创建或编辑配置文件
sudo mkdir -p /etc/docker
sudo tee /etc/docker/daemon.json <<-'EOF'
{"registry-mirrors": ["https://docker-0.unsee.tech"]
}
EOF
# 重新加载配置
sudo systemctl daemon-reload# 重启 Docker
sudo systemctl restart docker# 查看是否生效
docker info | grep -A 2 "Registry Mirrors"
运行rabbitMQ
如果执行不成功,无法下载,则可能你没有公共的DNS,添加
两种方式
方式1,vi打开 /etc/resolv.conf
search lan
nameserver 192.168.111.1
nameserver 8.8.8.8
nameserver 114.114.114.114
方式2
# 编辑 resolv.conf,添加公共 DNS
sudo tee /etc/resolv.conf <<-'EOF'
nameserver 8.8.8.8
nameserver 114.114.114.114
options timeout:1
EOF
docker run -d \--name rabbitmq \--hostname rabbitmq \-p 5672:5672 \-p 5671:5671 \-p 15672:15672 \-p 15671:15671 \-p 15692:15692 \-e RABBITMQ_DEFAULT_USER=admin \-e RABBITMQ_DEFAULT_PASS=SecurePass123! \-e RABBITMQ_DEFAULT_VHOST=my_vhost \--mount source=rabbitmq_data,target=/var/lib/rabbitmq \--mount source=rabbitmq_log,target=/var/log/rabbitmq \--restart=unless-stopped \rabbitmq:3.13-management
启动成功查看
访问
http://192.168.126.3:15672/#/
admin
SecurePass123!
go代码实现
五种消息模型
简单队列:点对点
特点:一个生产者发送消息到队列,仅有一个消费者接收并处理消息。
适用场景:一对一的简单通信场景。
下面的代码只有一个生产者,一个消费者
package mainimport ("log""strconv""sync""time""github.com/streadway/amqp"
)func main() {// 1. 连接 RabbitMQconn, err := amqp.Dial("amqp://admin:SecurePass123!@192.168.126.3:5672/my_vhost")if err != nil {log.Fatalf("Failed to connect to RabbitMQ: %v", err)}defer conn.Close()// 2. 创建 channelch, err := conn.Channel()if err != nil {log.Fatalf("Failed to open a channel: %v", err)}defer ch.Close()// 3. 声明队列(确保队列存在)queue, err := ch.QueueDeclare("Q1", // 队列名true, // 持久化:重启后队列仍存在false, // 自动删除:当最后一个消费者断开时,不自动删除false, // 排他性:非排他,其他连接也可使用false, // 阻塞:不阻塞nil, // 额外参数)if err != nil {log.Fatalf("Failed to declare a queue: %v", err)}// 4. 启动多个消费者(使用 WaitGroup 等待)var wg sync.WaitGroupwg.Add(1) // 启动 1 个消费者go func() {defer wg.Done()consume("consumer-1", conn) // 每个消费者使用自己的 channel}()// 5. 生产者:发送消息go func() {i := 0for {str := "Hello World" + strconv.Itoa(i)err := ch.Publish("", // 交换机:默认交换机queue.Name, // 路由键:队列名false, // mandatory:如果没有匹配的队列,不返回消息false, // immediate:不立即投递amqp.Publishing{ContentType: "text/plain",Body: []byte(str),},)if err != nil {log.Printf("Failed to publish a message: %v", err)return}i++time.Sleep(200 * time.Millisecond)}}()// 6. 阻塞主函数,等待消费者完成(实际上消费者会一直运行)wg.Wait() // 这里不会退出,除非消费者退出
}// consume 消费者函数
func consume(consumerName string, conn *amqp.Connection) {// 每个消费者使用独立的 channelch, err := conn.Channel()if err != nil {log.Printf("[%s] Failed to open channel: %v", consumerName, err)return}defer ch.Close()// 消费消息msgs, err := ch.Consume("Q1", // 队列名consumerName, // 消费者标签(唯一标识)true, // autoAck:自动确认(消息处理完后自动从队列删除)false, // exclusive:非排他false, // noLocal:不接收本连接发送的消息(AMQP 未强制支持)false, // noWait:不等待服务器确认nil, // args)if err != nil {log.Printf("[%s] Failed to consume: %v", consumerName, err)return}// 持续接收消息for msg := range msgs {// 使用锁或 log 打印,避免并发输出乱序log.Printf("[%s] Received: %s", consumerName, msg.Body)// 模拟处理时间time.Sleep(100 * time.Millisecond)}log.Printf("[%s] Consumer stopped.", consumerName)
}
工作队列模式
特点:多个消费者共享同一队列,消息按轮询(默认)或公平分配(能者多劳)机制处理。
适用场景:任务分发与负载均衡,如分布式任务处理
仅仅是在简单队列基础上,增加了一个消费队列,两个队列轮流消费
go func() {defer wg.Done()consume("consumer-1", conn) // 每个消费者使用自己的 channel}()go func() {defer wg.Done()consume("consumer-2", conn)}()
完整版
package mainimport ("log""strconv""sync""time""github.com/streadway/amqp"
)func main() {// 1. 连接 RabbitMQconn, err := amqp.Dial("amqp://admin:SecurePass123!@192.168.126.3:5672/my_vhost")if err != nil {log.Fatalf("Failed to connect to RabbitMQ: %v", err)}defer conn.Close()// 2. 创建 channelch, err := conn.Channel()if err != nil {log.Fatalf("Failed to open a channel: %v", err)}defer ch.Close()// 3. 声明队列(确保队列存在)queue, err := ch.QueueDeclare("Q1", // 队列名true, // 持久化:重启后队列仍存在false, // 自动删除:当最后一个消费者断开时,不自动删除false, // 排他性:非排他,其他连接也可使用false, // 阻塞:不阻塞nil, // 额外参数)if err != nil {log.Fatalf("Failed to declare a queue: %v", err)}// 4. 启动多个消费者(使用 WaitGroup 等待)var wg sync.WaitGroupwg.Add(2) // 启动 2 个消费者go func() {defer wg.Done()consume("consumer-1", conn) // 每个消费者使用自己的 channel}()go func() {defer wg.Done()consume("consumer-2", conn)}()// 5. 生产者:发送消息go func() {i := 0for {str := "Hello World" + strconv.Itoa(i)err := ch.Publish("", // 交换机:默认交换机queue.Name, // 路由键:队列名false, // mandatory:如果没有匹配的队列,不返回消息false, // immediate:不立即投递amqp.Publishing{ContentType: "text/plain",Body: []byte(str),},)if err != nil {log.Printf("Failed to publish a message: %v", err)return}i++time.Sleep(200 * time.Millisecond)}}()// 6. 阻塞主函数,等待消费者完成(实际上消费者会一直运行)wg.Wait() // 这里不会退出,除非消费者退出
}// consume 消费者函数
func consume(consumerName string, conn *amqp.Connection) {// 每个消费者使用独立的 channelch, err := conn.Channel()if err != nil {log.Printf("[%s] Failed to open channel: %v", consumerName, err)return}defer ch.Close()// 消费消息msgs, err := ch.Consume("Q1", // 队列名consumerName, // 消费者标签(唯一标识)true, // autoAck:自动确认(消息处理完后自动从队列删除)false, // exclusive:非排他false, // noLocal:不接收本连接发送的消息(AMQP 未强制支持)false, // noWait:不等待服务器确认nil, // args)if err != nil {log.Printf("[%s] Failed to consume: %v", consumerName, err)return}// 持续接收消息for msg := range msgs {// 使用锁或 log 打印,避免并发输出乱序log.Printf("[%s] Received: %s", consumerName, msg.Body)// 模拟处理时间time.Sleep(100 * time.Millisecond)}log.Printf("[%s] Consumer stopped.", consumerName)
}
发布/订阅模式
特点:生产者通过交换机将消息广播至所有绑定该交换机的队列,每个队列对应一个消费者。
适用场景:消息多副本分发,如日志同步。
fanout 模式
package mainimport ("log""strconv""sync""time""github.com/streadway/amqp"
)const exchangeName = "go-exchange"func main() {// 1. 连接 RabbitMQconn, err := amqp.Dial("amqp://admin:SecurePass123!@192.168.126.3:5672/my_vhost")if err != nil {log.Fatalf("Failed to connect to RabbitMQ: %v", err)}defer conn.Close()// 2. 创建 channelch, err := conn.Channel()if err != nil {log.Fatalf("Failed to open a channel: %v", err)}defer ch.Close()err = ch.ExchangeDeclare(exchangeName, // 交换机名:默认交换机"fanout", // 类型:直连 direct, topic , headers, fanouttrue, // 持久化:重启后交换机仍存在false, // 自动删除:当最后一个绑定断开时,不自动删除false, // 内部:不用于客户端应用false, // 无等待:不等待服务器确认nil, // 额外参数)if err != nil {log.Fatalf("Failed to declare an exchange: %v", err)}// 4.var wg sync.WaitGroupwg.Add(2) // 启动 2 个消费者go func() {defer wg.Done()subscribe(conn, exchangeName) // 每个消费者使用自己的 channel}()go func() {defer wg.Done()subscribe(conn, exchangeName) // 每个消费者使用自己的 channel}()// 5. 生产者:发送消息go func() {i := 0for {str := "Hello World" + strconv.Itoa(i)err := ch.Publish(exchangeName, // 交换机:默认交换机"", // routing key:路由键,fanout 类型不需要false, // mandatory:如果没有匹配的队列,不返回消息false, // immediate:不立即投递amqp.Publishing{ContentType: "text/plain",Body: []byte(str),},)if err != nil {log.Printf("Failed to publish a message: %v", err)return}i++time.Sleep(200 * time.Millisecond)}}()// 6. 阻塞主函数,等待消费者完成(实际上消费者会一直运行)wg.Wait() // 这里不会退出,除非消费者退出
}func subscribe(conn *amqp.Connection, exchangeName string) {// 每个消费者使用独立的 channelch, err := conn.Channel()if err != nil {log.Printf("Failed to open channel: %v", err)return}defer ch.Close()// 声明队列(确保队列存在)queue, err := ch.QueueDeclare("", // 队列名false, // 持久化:重启后队列仍存在true, // 自动删除:当最后一个消费者断开时,不自动删除true, // 排他性:非排他,其他连接也可使用false, // 阻塞:不阻塞nil, // 额外参数)if err != nil {log.Printf("Failed to declare a queue: %v", err)return}// 绑定队列到交换机err = ch.QueueBind(queue.Name, // 队列名"", // 路由键:fanout 类型不需要exchangeName, // 交换机名false, // 不等待服务器确认nil, // 额外参数)defer ch.QueueDelete(queue.Name, false, false, false)//调用消费者consume("consumer-haha", conn, queue.Name)}// consume 消费者函数
func consume(consumerName string, conn *amqp.Connection, queueName string) {// 每个消费者使用独立的 channelch, err := conn.Channel()if err != nil {log.Printf("[%s] Failed to open channel: %v", consumerName, err)return}defer ch.Close()// 消费消息msgs, err := ch.Consume(queueName, // 队列名consumerName, // 消费者标签(唯一标识)true, // autoAck:自动确认(消息处理完后自动从队列删除)false, // exclusive:非排他false, // noLocal:不接收本连接发送的消息(AMQP 未强制支持)false, // noWait:不等待服务器确认nil, // args)if err != nil {log.Printf("[%s] Failed to consume: %v", consumerName, err)return}// 持续接收消息for msg := range msgs {// 使用锁或 log 打印,避免并发输出乱序log.Printf("[%s] Received: %s", consumerName, msg.Body)// 模拟处理时间time.Sleep(100 * time.Millisecond)}log.Printf("[%s] Consumer stopped.", consumerName)
}
每次获取两条消息,这是因为有两个subscribe()携程在收消息