当前位置: 首页 > news >正文

004 rocketmq集群

1、集群模式
在RocketMQ中,集群的部署模式是比较多的,有以下几种:



public class ConsumerDemo {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-group");
        consumer.setNamesrvAddr("172.16.55.185:9876");
        // 订阅topic,接收此Topic下的所有消息
        consumer.subscribe("my-test-topic", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    try {
                        System.out.println(new String(msg.getBody(), "UTF-8"));
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("收到消息->" + msgs);
                if (msgs.get(0).getReconsumeTimes() >= 3) {
                    // 重试3次后,不再进行重试
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        });
        consumer.start();
    }
}

单个Master 这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用,不建议线上环境使用。 多
Master模式 一个集群无Slave,全是Master,例如2个Master或者3个Master 单台机器宕机期间,这台机器上未被消
费的消息在机器恢复之前不可订阅,消息实时性会受到影响。 多Master多Slave模式,异步复制 每个Master配置一
个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟,毫秒 级。 优点:即使磁盘损坏,消
息丢失的非常少,且消息实时性不会受影响,因为Master宕机后,消费者仍然 可以从Slave消费,此过程对应用透
明,不需要人工干预。性能同多Master模式几乎一样。 缺点:Master宕机,磁盘损坏情况,会丢失少量消息。 多
Master多Slave模式,同步双写 每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写方式,主备都写
成功,向应用返回 成功。 优点:数据与服务都无单点,Master宕机情况下,消息无延迟,服务可用性与数据可用性
都非常高。 缺点:性能比异步复制模式略低,大约低10%左右。

2、搭建2m2s集群
下面通过docker搭建2master+2slave的集群。
#创建2个master
#nameserver1

docker create -p 9876:9876 --name rmqserver01 \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-e "JAVA_OPTS=-Duser.home=/opt" \
foxiswho/rocketmq:server-4.5.1

#nameserver2

docker create -p 9877:9876 --name rmqserver02 \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-e "JAVA_OPTS=-Duser.home=/opt" \
foxiswho/rocketmq:server-4.5.1

#创建第1个master broker
#master broker01

docker create --net host --name rmqbroker01 \
-e "JAVA_OPTS=-Duser.home=/opt" \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
foxiswho/rocketmq:broker-4.5.1

#配置

namesrvAddr=172.16.55.185:9876;172.16.55.185:9877
brokerClusterName=testCluster
brokerName=broker01
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
brokerIP1=172.16.55.185
brokerIp2=172.16.55.185
listenPort=10911

#创建第2个master broker
#master broker02

docker create --net host --name rmqbroker02 \
-e "JAVA_OPTS=-Duser.home=/opt" \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
foxiswho/rocketmq:broker-4.5.1

#master broker02

namesrvAddr=172.16.55.185:9876;172.16.55.185:9877
brokerClusterName=testCluster
brokerName=broker02
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
brokerIP1=172.16.55.185
brokerIp2=172.16.55.185
listenPort=10811

#创建第1个slave broker
#slave broker01

docker create --net host --name rmqbroker03 \
-e "JAVA_OPTS=-Duser.home=/opt" \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
foxiswho/rocketmq:broker-4.5.1

#slave broker01

namesrvAddr=172.16.55.185:9876;172.16.55.185:9877
brokerClusterName=testCluster
brokerName=broker01
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
brokerIP1=172.16.55.185
brokerIp2=172.16.55.185
listenPort=10711

#创建第2个slave broker
#slave broker01

docker create --net host --name rmqbroker04 \
-e "JAVA_OPTS=-Duser.home=/opt" \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
foxiswho/rocketmq:broker-4.5.1

#slave broker02

namesrvAddr=172.16.55.185:9876;172.16.55.185:9877
brokerClusterName=testCluster
brokerName=broker02
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
brokerIP1=172.16.55.185
brokerIp2=172.16.55.185
listenPort=10611

#启动容器

docker start rmqserver01 rmqserver02
docker start rmqbroker01 rmqbroker02 rmqbroker03 rmqbroker04

相关文章:

  • C++杂记——RTTI
  • PageHelper新发现
  • list的模拟实现
  • P2P 下载科普:原理与应用
  • 三数之和_算法
  • 期权学习与期权异动
  • iOS 使用消息转发机制实现多代理功能
  • 如何将Vue项目部署至 nginx
  • Trae智能协作AI编程工具IDE:如何在MacBook Pro下载、安装和配置使用Trae?
  • DeepSeek 与大数据治理:AI 赋能数据管理的未来
  • W3C标准和ES规范之一文通
  • FPGA开发,使用Deepseek V3还是R1(3):系统级与RTL级
  • SpringBoot 端口配置
  • nlp第七节——文本匹配任务
  • 下载 MindSpore 配置 PyTorch环境
  • 使用消息队列怎样防止消息重复?
  • 苹果廉价机型 iPhone 16e 影像系统深度解析
  • Rust ~ Dyn Error
  • C语言:质因数分解
  • SpringCloud基础学习
  • “80后”萍乡市安源区区长邱伟,拟任县(区)委书记
  • 俄媒:俄乌伊斯坦布尔谈判将于北京时间今天17时30分开始
  • 上海虹桥国际咖啡文化节周五开幕,来看Coffeewalk通关攻略
  • 台行政机构网站删除“汉人”改为“其余人口”,国台办回应
  • 市场监管总局等五部门约谈外卖平台企业
  • 中保协发布《保险机构适老服务规范》,全面规范保险机构面向老年人提供服务的统一标准