当前位置: 首页 > wzjs >正文

营销型网站搭建的工作装修建材网站

营销型网站搭建的工作,装修建材网站,常熟市维摩剑门绿茶网站建设目标,东营造价信息网一、RocketMQ 概述RocketMQ 是阿里巴巴开源的一款分布式消息中间件,后捐赠给 Apache 基金会成为顶级项目。它具有低延迟、高并发、高可用、高可靠等特点,广泛应用于订单交易、消息推送、流计算、日志收集等场景。核心特点分布式架构:支持集群…

一、RocketMQ 概述

RocketMQ 是阿里巴巴开源的一款分布式消息中间件,后捐赠给 Apache 基金会成为顶级项目。它具有低延迟、高并发、高可用、高可靠等特点,广泛应用于订单交易、消息推送、流计算、日志收集等场景。

核心特点

  1. 分布式架构:支持集群部署,可水平扩展

  2. 高吞吐量:单机可支持10万级TPS

  3. 低延迟:毫秒级消息投递

  4. 高可用性:支持主从复制,自动故障转移

  5. 消息可靠性:支持消息持久化,确保不丢失

  6. 丰富的消息模式:支持普通消息、顺序消息、事务消息、定时消息等

二、核心概念

1. 基本组件

组件说明
NameServer轻量级注册中心,负责Broker的注册与发现
Broker消息存储与转发服务器,负责消息存储、投递和查询
Producer消息生产者,负责发送消息
Consumer消息消费者,负责消费消息
Topic消息主题,用于消息分类
Message Queue消息队列,Topic的分区单位
Tag消息标签,用于消息二级分类
Group生产者组/消费者组,用于集群管理

一、阿里云rocketMQ

使用阿里云 ONS SDK
<dependency><groupId>com.aliyun.openservices</groupId><artifactId>ons-client</artifactId><version>2.0.5.Final</version> <!-- 推荐最新版本 -->
</dependency>

获取阿里云 RocketMQ 配置

  • Endpointhttp://{YourInstanceId}.mq-internet.aliyuncs.com:80

  • AccessKey:阿里云账号的 AccessKey ID 和 AccessKey Secret

  • Topic:消息主题(需在阿里云控制台创建)

  • Group ID:消费者组(需在控制台创建)

1、发消息

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;import java.util.Properties;public class AliyunMQProducer {public static void main(String[] args) {// 1. 配置 ProducerProperties properties = new Properties();properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://YourInstanceId.mq-internet.aliyuncs.com:80");properties.put(PropertyKeyConst.AccessKey, "YourAccessKey");properties.put(PropertyKeyConst.SecretKey, "YourSecretKey");properties.put(PropertyKeyConst.GROUP_ID, "YourGroupId"); // Producer Group ID// 2. 创建 ProducerProducer producer = ONSFactory.createProducer(properties);producer.start();// 3. 创建消息Message msg = new Message("YourTopic",  // Topic"YourTag",    // Tag"Hello Aliyun RocketMQ!".getBytes()  // Body);// 4. 发送消息producer.send(msg);System.out.println("消息发送成功!");// 5. 关闭 Producerproducer.shutdown();}
}

2、消费MQ

import com.aliyun.openservices.ons.api.*;
import java.util.Properties;public class AliyunMQConsumer {public static void main(String[] args) {// 1. 配置 ConsumerProperties properties = new Properties();properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://YourInstanceId.mq-internet.aliyuncs.com:80");properties.put(PropertyKeyConst.AccessKey, "YourAccessKey");properties.put(PropertyKeyConst.SecretKey, "YourSecretKey");properties.put(PropertyKeyConst.GROUP_ID, "YourGroupId"); // Consumer Group ID// 2. 创建 ConsumerConsumer consumer = ONSFactory.createConsumer(properties);// 3. 订阅 Topic 和 Tag(* 表示所有 Tag)consumer.subscribe("YourTopic", "*", new MessageListener() {@Overridepublic Action consume(Message message, ConsumeContext context) {System.out.println("收到消息: " + new String(message.getBody()));return Action.CommitMessage; // 消费成功}});// 4. 启动 Consumerconsumer.start();System.out.println("消费者已启动,等待消息...");}
}

 

  1. 阿里云 ONS SDK 更稳定,推荐使用(比 Apache RocketMQ 客户端更适配阿里云环境)。

  2. Topic 和 Group ID 需先在阿里云控制台创建,否则会报错。

  3. 生产环境建议配置重试机制和日志监控,避免消息丢失。

  4. 消费模式

    • 集群消费(CLUSTERING):同 Group ID 的多个 Consumer 分摊消息(默认)。

    • 广播消费(BROADCASTING):同 Group ID 的每个 Consumer 都收到所有消息。

二、腾讯云RocketMQ

import java.io.UnsupportedEncodingException;
import java.util.List;import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;import lombok.extern.slf4j.Slf4j;/*** 腾讯云rocketMQ服务类*/
@Slf4j
@Service
@Transactional(rollbackFor = Exception.class)
public class RocketTXMqService {@Value("${rocketmq.namespace:-1}")private String namespace;@Value("${rocketmq.producer.group:-1}")private String groupName;@Value("${rocketmq.producer.access-key:-1}")private String accessKey;@Value("${rocketmq.producer.secret-key:-1}")private String secretKey;@Value("${rocketmq.name-server:-1}")private String nameserver;// MQ生产者private DefaultMQProducer producer;// MQ实例化消费者pushprivate DefaultMQPushConsumer pushConsumer;// MQ实例化消费者pullprivate DefaultLitePullConsumer pullConsumer;/*** 创建生产者* * @return*/public DefaultMQProducer getProducer() {if (null == producer) {// 实例化消息生产者Producerproducer = new DefaultMQProducer(namespace, groupName,new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)) // ACL权限);// 设置NameServer的地址producer.setNamesrvAddr(nameserver);try {// 启动Producer实例producer.start();} catch (MQClientException e) {e.printStackTrace();}}return producer;}/*** 同步发送 发送消息*/public void syncSend(String topic, String tag, String data) {producer = getProducer();// 发送消息SendResult sendResult = null;try {// 创建消息实例,设置topic和消息内容Message msg = new Message(topic, tag, data.getBytes(RemotingHelper.DEFAULT_CHARSET));sendResult = producer.send(msg);log.info("埋点信息发送腾讯云MQ:" + data);log.info("发送腾讯云MQ接口返回状态sendResult:" + sendResult);} catch (UnsupportedEncodingException e) {log.error("UnsupportedEncodingException:" + e.getMessage());} catch (MQClientException e) {log.error("MQClientException:" + e.getMessage());} catch (RemotingException e) {log.error("RemotingException:" + e.getMessage());} catch (MQBrokerException e) {log.error("MQBrokerException:" + e.getMessage());} catch (InterruptedException e) {log.error("InterruptedException:" + e.getMessage());}}/*** 创建push消费者* * @return*/public DefaultMQPushConsumer getPushConsumer() {if (null == pushConsumer) {// 实例化消费者pushConsumer = new DefaultMQPushConsumer(namespace, groupName,new AclClientRPCHook(new SessionCredentials(accessKey, secretKey))); // ACL权限// 设置NameServer的地址pushConsumer.setNamesrvAddr(nameserver);}return pushConsumer;}/*** 创建pull 消费者* * @return*/public DefaultLitePullConsumer getPullConsumer() {if (null == pullConsumer) {// 实例化消费者// 实例化消费者pullConsumer = new DefaultLitePullConsumer(namespace, groupName,new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)));// 设置NameServer的地址pullConsumer.setNamesrvAddr(nameserver);// 设置从第一个偏移量开始消费pullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);}return pullConsumer;}/*** push方式订阅消费* * @param topicName*/public void pushConsumer(String topicName) {pushConsumer = this.getPushConsumer();if (null != pushConsumer) {try {pushConsumer.subscribe(topicName, "*");// 注册回调实现类来处理从broker拉取回来的消息pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {// 消息处理逻辑log.info("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);// 标记该消息已经被成功消费, 根据消费情况,返回处理状态return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// 启动消费者实例pushConsumer.start();} catch (MQClientException e) {log.error("push MQClientException:" + e.getMessage());}}}/*** pull方式订阅消费* * @param topicName*/public void pullConsumer(String topicName) {pullConsumer = this.getPullConsumer();if (null != pullConsumer) {try {// 订阅topicpullConsumer.subscribe(topicName, "*");// 启动消费者实例pullConsumer.start();} catch (MQClientException e) {log.error(" pull MQClientException:" + e.getMessage());}try {log.info("Consumer Started.%n");while (true) {// 拉取消息List<MessageExt> messageExts = pullConsumer.poll();log.info("%s%n", messageExts);}} finally {pullConsumer.shutdown();}}}}


文章转载自:

http://2QCtrVg0.dmzfz.cn
http://CYOz7Eco.dmzfz.cn
http://xetUJKLp.dmzfz.cn
http://GrUOPaVx.dmzfz.cn
http://JbeqTrk2.dmzfz.cn
http://eHdNOaLW.dmzfz.cn
http://08vRPLVY.dmzfz.cn
http://cz1cerbt.dmzfz.cn
http://ezUbwnzS.dmzfz.cn
http://hRTZ7mCJ.dmzfz.cn
http://zoLPz6dy.dmzfz.cn
http://0g02eNZ1.dmzfz.cn
http://zx6uUBKk.dmzfz.cn
http://qSSIGWym.dmzfz.cn
http://51jxtkZy.dmzfz.cn
http://nDDSYetd.dmzfz.cn
http://RFcFDtHG.dmzfz.cn
http://YS98p44b.dmzfz.cn
http://r9rsh0G3.dmzfz.cn
http://fRlQhjZX.dmzfz.cn
http://P6ITGGqi.dmzfz.cn
http://5n3wx8RH.dmzfz.cn
http://65pwu1EZ.dmzfz.cn
http://TsBRzOqQ.dmzfz.cn
http://H0vjT6eE.dmzfz.cn
http://cUMKIqXr.dmzfz.cn
http://YCUp5S1X.dmzfz.cn
http://a1TO7e3a.dmzfz.cn
http://mtV1g56J.dmzfz.cn
http://CSRQ9le3.dmzfz.cn
http://www.dtcms.com/wzjs/645913.html

相关文章:

  • ps做网站画布多大重庆任务盟网站建设
  • 网站建设案例基本流程图cms网站源码
  • 长春网站建设4435北京做网站设计招聘
  • 成都市建设学校网站最大的网站建设
  • 做网站 多少人做网站应该用什么配置的手提电脑
  • 自建网站支付问题wordpress 主题 相册
  • 临沂做网站系统烟台市网站建设
  • 太原网站建设维护专业建设内容
  • 鞍山新款网站制作哪家好创业商机网加工项目
  • 用层还是表格做网站快万能浏览器网页版
  • 北京建设部网站为什么有人做商城优惠券网站卖
  • wordpress的短代码长沙做网站优化
  • 制作公司网站价格潍坊外贸网站建设
  • 海南找人做网站网站开发需要哪些条件
  • 怎么选择网站开发公司网站建设行业赚钱么
  • 一流的网站建设公司中国建设网银登录
  • 做门户网站用什么模板网站建立软件
  • 企业网站 建设过程app软件制作公司哪家好
  • 电商网站管理系统模板下载网站建设网站排名优化
  • 怎么在网上找做网站的客户一键查询注册过的网站
  • 网站设计专业有哪些课程河南省新闻出版学校咋样
  • 北海建设网站wordpress出现不能登录界面
  • fastcomet wordpress小程序定制 seo营销
  • 玩客云 做网站服务器网站开发公司前台模板
  • 网站开发的学习路线微信公众号怎么创建账号
  • 微信网页开发教程厦门搜索引擎优化合作
  • 怎样模仿别人的网站网站建设为啥每年都要收费
  • 安丘做网站wordpress升级快速
  • 网站黑链网站策划任职要求
  • 越城网站建设公司网站搭建与推广