当前位置: 首页 > wzjs >正文

国外免费推广网站网站建设服务费如何做会计分录

国外免费推广网站,网站建设服务费如何做会计分录,电商网站业务流程图,erp管理系统多少钱一、RocketMQ 消息发送原理与模式​ 1.1 消息发送原理​ RocketMQ 消息发送的核心流程围绕 Producer、NameServer 和 Broker 展开。Producer 启动时,会向 NameServer 请求获取 Topic 的路由信息,这些信息包括 Topic 对应的 Broker 列表以及 Broker 上的…

一、RocketMQ 消息发送原理与模式​

1.1 消息发送原理​

RocketMQ 消息发送的核心流程围绕 Producer、NameServer 和 Broker 展开。Producer 启动时,会向 NameServer 请求获取 Topic 的路由信息,这些信息包括 Topic 对应的 Broker 列表以及 Broker 上的队列分布情况。Producer 拿到路由信息后,会根据特定的负载均衡策略(如轮询、随机等),选择合适的 Broker 和队列进行消息发送 。消息发送到 Broker 后,Broker 会将消息存储到 CommitLog 文件中,并更新相关的索引文件(如 ConsumeQueue 和 IndexFile),以便后续 Consumer 能够高效地查询和消费消息。​

1.2 消息发送模式​

1.2.1 同步发送​

同步发送是指 Producer 发送消息后,会阻塞等待 Broker 的响应,直到收到 Broker 返回的确认信息,才会继续执行后续的业务逻辑。这种模式保证了消息发送的可靠性,适用于对消息可靠性要求较高且允许一定等待时间的场景,如订单处理、金融交易等。​

在 CentOS 7 环境下,使用 RocketMQ 自带的示例代码演示同步发送消息:​

  1. 准备工作:确保 RocketMQ 已在 CentOS 7 上正确安装并启动 NameServer 和 Broker。进入 RocketMQ 安装目录的bin文件夹。​
  2. 修改配置:在实际项目中,若需修改 Producer 的相关配置,可通过代码中的配置项实现。例如,设置 NameServer 地址,在 Java 代码中可使用以下方式:
DefaultMQProducer producer = new 
DefaultMQProducer("producer_group_name");
producer.setNamesrvAddr("localhost:9876");

这里producer_group_name为 Producer 组名称,localhost:9876为 NameServer 的地址和端口。​

     3. 发送消息代码:执行以下命令发送同步消息:

sh tools.sh org.apache.rocketmq.example.quickstart.Producer

其底层 Java 代码实现如下:

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("producer_group_name");producer.setNamesrvAddr("localhost:9876");producer.start();for (int i = 0; i < 10; i++) {try {Message msg = new Message("TopicTest","TagA",("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}producer.shutdown();}
}

在上述代码中,创建了DefaultMQProducer实例,设置 NameServer 地址并启动。通过循环发送 10 条消息到名为TopicTest的 Topic 中,TagA为消息标签,消息内容为自定义字符串。发送成功后,打印SendResult信息,包含消息发送的状态、队列偏移量等。​

1.2.2 异步发送​

异步发送中,Producer 发送消息后不会阻塞等待,而是继续执行后续代码,通过回调函数来处理 Broker 返回的响应结果。这种模式适用于对响应时间要求较高,且可以容忍一定消息丢失风险的场景,如日志记录、实时数据分析等。​

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;public class AsyncProducer {public static void main(String[] args) throws MQClientException {DefaultMQProducer producer = new DefaultMQProducer("producer_group_name");producer.setNamesrvAddr("localhost:9876");producer.start();for (int i = 0; i < 10; i++) {final int index = i;Message msg = new Message("TopicTest","TagA",("Hello Async RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.printf("%-10d OK %s%n", index, sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {System.out.printf("%-10d Exception %s%n", index, e);e.printStackTrace();}});}producer.shutdown();}
}

在该代码里,producer.send方法传入了SendCallback接口的实现类,当消息发送成功时,会在onSuccess方法中打印消息发送成功的信息;若发送失败,onException方法会捕获异常并打印相关信息。​

1.2.3 单向发送​

单向发送即 Producer 只负责将消息发送出去,不关心 Broker 的响应结果,也不会阻塞等待。这种模式具有最高的发送效率,但消息可靠性较低,适用于对消息可靠性要求不高的场景,如即时通讯中的消息推送。​

代码示例如下:

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;public class OnewayProducer {public static void main(String[] args) throws MQClientException {DefaultMQProducer producer = new DefaultMQProducer("producer_group_name");producer.setNamesrvAddr("localhost:9876");producer.start();for (int i = 0; i < 10; i++) {Message msg = new Message("TopicTest","TagA",("Hello Oneway RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));producer.sendOneway(msg);}producer.shutdown();}
}

sendOneway方法直接将消息发送出去,无返回值和回调处理。​

二、RocketMQ 消息消费原理与模式​

2.1 消息消费原理​

Consumer 启动后,同样会向 NameServer 获取 Topic 的路由信息。根据消费模式(集群消费或广播消费)和负载均衡策略,Consumer 会从对应的 Broker 拉取消息。在拉取消息时,Consumer 会根据本地记录的消费进度(偏移量)从 Broker 的 ConsumeQueue 中获取消息。获取到消息后,Consumer 对消息进行业务处理,处理完成后,会更新本地的消费进度,以便下次继续从正确的位置拉取消息 。​

2.2 消息消费模式​

2.2.1 集群消费​

集群消费模式下,多个 Consumer 实例组成一个消费组,共同消费 Topic 中的消息。每个消息只会被消费组中的一个 Consumer 实例消费,这种模式可以提高消息的消费效率,适用于大部分业务场景。​

在 CentOS 7 上,使用以下命令运行集群消费示例:

sh tools.sh org.apache.rocketmq.example.quickstart.Consumer

Java 代码实现如下:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class ClusterConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group_name");consumer.setNamesrvAddr("localhost:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("TopicTest", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");}
}

在上述代码中,创建了DefaultMQPushConsumer实例,设置 NameServer 地址和消费起始位置(从第一条消息开始消费),订阅TopicTest主题下的所有消息。通过registerMessageListener注册消息监听器,当接收到消息时,在consumeMessage方法中打印消息内容,并返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS表示消费成功。​

2.2.2 广播消费​

广播消费模式下,Topic 中的消息会被消费组中的所有 Consumer 实例消费。这种模式适用于需要所有消费者都处理相同消息的场景,如系统配置更新通知。​

代码示例如下:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class BroadcastConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group_name");consumer.setNamesrvAddr("localhost:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.setMessageModel(org.apache.rocketmq.common.message.MessageModel.BROADCASTING);consumer.subscribe("TopicTest", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");}
}

与集群消费代码的主要区别在于,通过setMessageModel方法将消费模式设置为BROADCASTING,表示广播消费。​

三、常见问题与解决​

3.1 消息发送失败​

1、原因分析:可能是 NameServer 地址配置错误、Broker 未正常启动、网络连接问题,或者消息大小超过 Broker 配置的限制等。​

2、解决方法:检查 NameServer 地址和端口是否正确;通过日志查看 Broker 的启动状态;检查网络连接是否正常;若消息过大,可调整 Broker 的maxMessageSize配置项,在broker.conf文件中添加或修改:

maxMessageSize=65536

这里将消息最大大小设置为 64KB。

3.2 消息消费异常​

1、原因分析:可能是消费逻辑代码出现异常、消息格式错误、消费进度记录异常等。​

2、解决方法:通过查看 Consumer 的日志,定位消费逻辑中的异常代码;检查消息格式是否符合预期;若消费进度异常,可手动调整消费偏移量,在代码中通过DefaultMQPushConsumer的相关方法实现,如:

consumer.resetOffset("TopicTest", queueId, offset);

其中TopicTest为主题名,queueId为队列编号,offset为指定的偏移量。


文章转载自:

http://PNIrLDhH.sLqzb.cn
http://lwWtp9j1.sLqzb.cn
http://EEZwbryk.sLqzb.cn
http://ERaj1ZeP.sLqzb.cn
http://LQGGwbD3.sLqzb.cn
http://ZH8vkrYo.sLqzb.cn
http://QyeviNfn.sLqzb.cn
http://bYCgfTfe.sLqzb.cn
http://ymDQYFwM.sLqzb.cn
http://8YslTaT6.sLqzb.cn
http://m9QWH4VV.sLqzb.cn
http://wIp3JTSY.sLqzb.cn
http://ALbHCTRD.sLqzb.cn
http://mUThNbLT.sLqzb.cn
http://TKIlAQgc.sLqzb.cn
http://8Fv1X9gj.sLqzb.cn
http://NDJVAi0W.sLqzb.cn
http://A2A1MRiU.sLqzb.cn
http://gtjQpgJa.sLqzb.cn
http://IjJYRN45.sLqzb.cn
http://pHE1zsLd.sLqzb.cn
http://l8tQNxRT.sLqzb.cn
http://1n1L0Gwa.sLqzb.cn
http://x7z8NQN2.sLqzb.cn
http://HT5kK03f.sLqzb.cn
http://4QC21eEl.sLqzb.cn
http://BmkRLHT6.sLqzb.cn
http://DSRrSuKm.sLqzb.cn
http://11gf9Zhx.sLqzb.cn
http://PEAvRusQ.sLqzb.cn
http://www.dtcms.com/wzjs/672903.html

相关文章:

  • 网站开发工作总结东莞教育团购网站建设
  • 建立网站需要多长钱大朗做网站
  • 广东建设监理协会网站题库浙江省旅游企业网站建设情况
  • 青岛网站建设报价网站主页建设格式
  • 首信建设网站网站怎么做站群
  • 网站建设及相关流程图淮南网站制作
  • 多语言网站怎么实现域名查询 站长查询
  • 微信上的网站怎么做的国内做家具外贸的网站
  • 建设门户网站需要注意什么意思建立良好的公共秩序教学设计
  • 东莞数据线厂家东莞网站建设网络营销策划推广公司一一
  • 建设网站需要问的问题手机网站信任从哪里设置
  • 怎么用flashfxp上传网站学习建设网站需要多久
  • 开通网站流程高端seo服务
  • 做网站优化时 链接名称"首页"有必要添加nofollow吗?破解网站后台密码有人做吗
  • 佛山企业网站推广263企业邮箱入口登录方法
  • 网站建设要注意哪些计算机软件开发培训机构
  • 搜狗站长平台主动提交wordpress批量发邮
  • 品牌策划费用哈尔滨优化关键词免费
  • 阿里云服务器可以做商业网站php网站开发工程师招聘要求
  • 北京响应式h5网站开发登錄wordpress界面
  • 中小企业网站制作过程中要注意什么纸巾 技术支持 东莞网站建设
  • 高端网站定制设计公司沭阳建设局网站
  • 淄博市建设局网站营销网站优化推广
  • 重庆网站设计重庆最加科技谷歌浏览器官方app下载
  • 大连百度推广开户网站优化推广 视屏
  • 如何做问卷调查网站论坛交流平台有哪些
  • 鞍山市城乡建设局网站策划书怎么写 范文
  • 网站专门做冻品的WordPress外链网盘
  • 佛山做外贸网站个人网页需要什么内容
  • 南京市雨花区建设局网站检察门户网站 建设意义