当前位置: 首页 > wzjs >正文

门户网站建设为企业带来的好处网站开发页面布局

门户网站建设为企业带来的好处,网站开发页面布局,安徽网新科技有限公司怎么样,福州seo网站排名一、RocketMQ 概述 1.1 什么是 RocketMQ RocketMQ 是阿里巴巴开源的一款分布式消息中间件,后捐赠给 Apache 基金会成为顶级项目。它具有低延迟、高并发、高可用、高可靠等特点,广泛应用于订单交易、消息推送、流计算、IoT 等场景。 1.2 核心特性 高吞…

一、RocketMQ 概述

1.1 什么是 RocketMQ

RocketMQ 是阿里巴巴开源的一款分布式消息中间件,后捐赠给 Apache 基金会成为顶级项目。它具有低延迟、高并发、高可用、高可靠等特点,广泛应用于订单交易、消息推送、流计算、IoT 等场景。

1.2 核心特性

  • 高吞吐量:单机支持 10 万级 TPS
  • 低延迟:毫秒级消息投递
  • 高可用:主从架构,支持多副本
  • 消息可靠:支持消息持久化、事务消息
  • 扩展性强:支持集群部署,可水平扩展
  • 丰富的消息类型:顺序消息、定时消息、事务消息等

二、RocketMQ 核心架构

2.1 架构组成

RocketMQ 由四个核心组件构成:

  1. NameServer:轻量级注册中心,负责 Broker 的注册与发现
  2. Broker:消息存储与转发服务器
  3. Producer:消息生产者
  4. Consumer:消息消费者
[Producer]  →  [NameServer]  ←  [Consumer]↓                ↑[Broker] ←→ [Broker]

2.2 核心概念

  • Topic:消息主题,一级消息类型
  • Message Queue:消息队列,Topic 的分区单位
  • Tag:消息标签,二级消息类型
  • Group:生产者/消费者组
  • Offset:消息在队列中的位置标识

三、消息生产与消费

3.1 消息发送模式

3.1.1 同步发送
public class SyncProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("producer_group");producer.setNamesrvAddr("localhost:9876");producer.start();Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));// 同步发送,等待Broker返回结果SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);producer.shutdown();}
}
3.1.2 异步发送
public class AsyncProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("producer_group");producer.setNamesrvAddr("localhost:9876");producer.start();Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));// 异步发送,设置回调函数producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.printf("Send success: %s%n", sendResult);}@Overridepublic void onException(Throwable e) {System.out.printf("Send failed: %s%n", e);}});Thread.sleep(5000); // 等待回调完成producer.shutdown();}
}
3.1.3 单向发送
// 只发送消息,不关心结果
producer.sendOneway(msg);

3.2 消息消费模式

3.2.1 集群消费(CLUSTERING)
public class ClusterConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("TopicTest", "*");// 集群模式(默认)consumer.setMessageModel(MessageModel.CLUSTERING);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("Consumer Started.");}
}
3.2.2 广播消费(BROADCASTING)
// 广播模式(所有消费者都收到全量消息)
consumer.setMessageModel(MessageModel.BROADCASTING);

四、高级特性

4.1 顺序消息

生产者

// 确保相同订单ID的消息发送到同一个队列
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;int index = id % mqs.size();return mqs.get(index);}
}, orderId); // orderId作为选择队列的依据

消费者

consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {// 保证顺序消费return ConsumeOrderlyStatus.SUCCESS;}
});

4.2 事务消息

public class TransactionProducer {public static void main(String[] args) throws Exception {TransactionMQProducer producer = new TransactionMQProducer("transaction_group");producer.setNamesrvAddr("localhost:9876");// 设置事务监听器producer.setTransactionListener(new TransactionListener() {@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {// 执行本地事务try {// 模拟业务处理System.out.println("Executing local transaction: " + msg);Thread.sleep(1000);return LocalTransactionState.COMMIT_MESSAGE;} catch (Exception e) {return LocalTransactionState.ROLLBACK_MESSAGE;}}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {// 检查本地事务状态return LocalTransactionState.COMMIT_MESSAGE;}});producer.start();Message msg = new Message("TransactionTopic", null, "Transaction Message".getBytes(RemotingHelper.DEFAULT_CHARSET));// 发送事务消息TransactionSendResult sendResult = producer.sendMessageInTransaction(msg, null);System.out.printf("%s%n", sendResult);Thread.sleep(5000);producer.shutdown();}
}

4.3 延迟消息

// 设置延迟级别(1-18分别对应1s,5s,10s,30s,1m...2h)
msg.setDelayTimeLevel(3); // 10秒后投递

五、RocketMQ 集群部署

5.1 集群模式

  1. 单Master模式:开发测试用,不可靠
  2. 多Master模式:高性能,无单点故障
  3. 多Master多Slave模式(异步复制):性能与可靠性的平衡
  4. 多Master多Slave模式(同步双写):高可靠性,性能略低

5.2 部署建议

  • NameServer:至少2台,保证高可用
  • Broker
    • 生产环境推荐多Master多Slave
    • 每个Master配置至少1个Slave
    • 主从分布在不同的物理机器

六、性能优化

6.1 生产者优化

  1. 批量发送

    List<Message> messages = new ArrayList<>();
    // 添加多条消息
    producer.send(messages);
    
  2. 合理设置发送超时

    producer.setSendMsgTimeout(3000); // 3秒
    
  3. 关闭VIP通道(非阿里云环境):

    producer.setVipChannelEnabled(false);
    

6.2 消费者优化

  1. 增加消费线程数

    consumer.setConsumeThreadMin(20);
    consumer.setConsumeThreadMax(64);
    
  2. 设置批量消费

    consumer.setConsumeMessageBatchMaxSize(10); // 每次最多消费10条
    
  3. 跳过堆积消息(特殊场景):

    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    

七、常见问题解决方案

7.1 消息重复消费

解决方案

  1. 消费端实现幂等处理
  2. 使用数据库唯一键约束
  3. 使用Redis等缓存记录已处理消息ID

7.2 消息堆积

解决方案

  1. 增加消费者实例
  2. 提高消费者并行度
  3. 优化消费逻辑,减少处理时间
  4. 临时扩容Topic队列数

7.3 消息丢失

预防措施

  1. 生产端使用同步发送+重试机制
  2. Broker配置同步刷盘
    flushDiskType=SYNC_FLUSH
    
  3. 主从同步双写模式

八、监控与管理

8.1 控制台部署

RocketMQ 提供可视化控制台,可监控:

  • 消息堆积情况
  • 消费者延迟
  • Broker运行状态
  • Topic/Queue分布

8.2 关键监控指标

  1. 生产消费TPS
  2. 消息堆积量
  3. 消费延迟时间
  4. Broker CPU/Memory
  5. 磁盘IO使用率

九、最佳实践

  1. Topic命名规范:业务线_子系统_功能,如"Trade_Order_Notify"
  2. Tag使用原则:细化消息分类,如"PaySuccess", “PayFailed”
  3. 消息大小控制:建议不超过1MB
  4. Producer/Consumer分组:按业务功能划分
  5. 消息Key设置:便于问题追踪
    msg.setKeys("ORDER_10086");
    

十、总结

RocketMQ 作为一款成熟的分布式消息中间件,在电商、金融、IoT等领域有着广泛应用。掌握其核心原理、部署架构和优化技巧,能够帮助开发者构建高性能、高可靠的消息系统。在实际应用中,需要根据业务场景合理选择消息模式,并做好监控与运维工作,确保消息系统的稳定运行。


文章转载自:

http://a44PEtuI.mLzyx.cn
http://txdaWuQ3.mLzyx.cn
http://Nm6cS4Pj.mLzyx.cn
http://udLzGAQq.mLzyx.cn
http://7ILHvTYz.mLzyx.cn
http://DJpPPGUN.mLzyx.cn
http://KhuQF8fa.mLzyx.cn
http://gT2JoKS1.mLzyx.cn
http://nZlHxOPE.mLzyx.cn
http://Ntu7cHLA.mLzyx.cn
http://xEjV24RR.mLzyx.cn
http://f3SvWluf.mLzyx.cn
http://ULFtiG9S.mLzyx.cn
http://zvlSrJmM.mLzyx.cn
http://40EyUTHZ.mLzyx.cn
http://kEFBgj6L.mLzyx.cn
http://87uvqMdf.mLzyx.cn
http://Jor8RdZU.mLzyx.cn
http://sQ3uUIvY.mLzyx.cn
http://XKWU9Swf.mLzyx.cn
http://wK9i0u3B.mLzyx.cn
http://gYEtPFNa.mLzyx.cn
http://7ViBOy9m.mLzyx.cn
http://RKQi71wq.mLzyx.cn
http://3zuugHcs.mLzyx.cn
http://8soGr1lG.mLzyx.cn
http://lgwFTo6J.mLzyx.cn
http://EndzUnMB.mLzyx.cn
http://XLTGvaGT.mLzyx.cn
http://ypkGDP77.mLzyx.cn
http://www.dtcms.com/wzjs/732669.html

相关文章:

  • 做个小型购物网站要多少钱简单的j网站建设方案书
  • 蓝色旅游资讯网站模板油漆企业网站要怎么做
  • 网站建设得多钱婚纱设计网站
  • 网站后台怎么控制包装设计网站是什么样子的
  • 深圳做网站公司哪家好云浮建设网站
  • 网站宣传怎么做广州品牌seo推广
  • 视频网站外链怎么做网站动态图标
  • 网站怎么做才能赚钱宁阳网站设计
  • 不同类型网站优势对建设网站未来发展的建议
  • 做短视频网站有流量吗深圳小程序公司
  • 做外贸生意上国外网站怎么拥有自己的网站
  • 网站建设优化价格网页制作与网站建设ppt
  • 建立网站要钱吗互联网网站如何做流量统计
  • 电子商务的网站开发网站策划书的内容
  • 刚做的网站怎么搜索不出来网站推广指标包括
  • 17网站一起做网店潮汕档口网站优化外包费用
  • 长春世邦做网站影视制作宣传片公司
  • 汕头网站建设维护wordpress网站导出
  • 做网站h5烟台快速建站公司
  • 网站建设计划建议wordpress创建网站
  • wordpress建站seo好做吗phpcms企业网站模板
  • 优秀材料写作网站建设一个视频教学网站
  • 小程序网站模板主题猫wordpress
  • 优化seo网站wordpress自定义短码
  • 做网站都需要什么资料视频网站模板下载
  • 网站建设与服务考试山东网站制作设计
  • 网站抬头怎么做怎么做王者荣耀网站
  • 网站炫酷首页如何做网站的百科
  • 江苏省 前置审批 网站网站开发与运营方向
  • 翻墙国外网站做兼职设计排版网站