当前位置: 首页 > wzjs >正文

如何做网站定位成都市微信网站建设

如何做网站定位,成都市微信网站建设,做百度推广需要自己有个网站吗,产品营销推广策略1. 概述 RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。同时,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销…

1. 概述

RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。同时,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。

具有以下特点:

  • 能够保证严格的消息顺序
  • 提供丰富的消息拉取模式
  • 高效的订阅者水平扩展能力
  • 实时的消息订阅机制
  • 亿级消息堆积能力

Spring Cloud Stream

2.1 Spring Cloud Stream 是什么

Spring Cloud Stream 是一个用于构建基于消息的微服务应用框架,它通过 Spring Integration 与消息中间件(如 RabbitMQ、Kafka、RocketMQ)进行连接。

2.2 核心概念

  • Binder:与消息中间件集成的组件,负责创建对应的 Binding。
  • Binding:消息中间件与应用程序之间的桥梁,分为 Input Binding(用于消费消息)和 Output Binding(用于生产消息)。

2.3 Broker 的角色

Broker 是消息队列中间件的代理服务器,负责存储消息、转发消息。例如,在 RocketMQ 中,Broker 负责接收从生产者发送来的消息并存储,同时为消费者的拉取请求作准备。

三、快速入门

3.1 搭建生产者

3.1.1 引入依赖

pom.xml 中引入 Spring Cloud Alibaba RocketMQ 相关依赖:

<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
3.1.2 配置文件

application.yaml 中添加 Spring Cloud Alibaba RocketMQ 相关配置:

spring:application:name: demo-producer-applicationcloud:stream:bindings:demo01-output:destination: DEMO-TOPIC-01content-type: application/jsonrocketmq:binder:name-server: 127.0.0.1:9876bindings:demo01-output:producer:group: testsync: true
3.1.3 创建 MySource 接口

声明名字为 Output Binding:

public interface MySource {@Output("demo01-output")MessageChannel demo01Output();
}
3.1.4 创建 Demo01Message 类

作为示例消息:

public class Demo01Message {private Integer id;// getter 和 setter 方法
}
3.1.5 创建 Demo01Controller 类

提供发送消息的 HTTP 接口:

@RestController
@RequestMapping("/demo01")
public class Demo01Controller {@Autowiredprivate MySource mySource;@GetMapping("/send")public boolean send() {Demo01Message message = new Demo01Message().setId(new Random().nextInt());Message<Demo01Message> springMessage = MessageBuilder.withPayload(message).build();return mySource.demo01Output().send(springMessage);}
}
3.1.6 创建 ProducerApplication 类

启动应用:

@SpringBootApplication
@EnableBinding(MySource.class)
public class ProducerApplication {public static void main(String[] args) {SpringApplication.run(ProducerApplication.class, args);}
}

3.2 搭建消费者

3.2.1 引入依赖

与生产者类似,引入 Spring Cloud Alibaba RocketMQ 相关依赖。

3.2.2 配置文件

application.yaml 中添加消费者相关的配置:

spring:application:name: demo-consumer-applicationcloud:stream:bindings:demo01-input:destination: DEMO-TOPIC-01content-type: application/jsongroup: demo01-consumer-group-DEMO-TOPIC-01rocketmq:binder:name-server: 127.0.0.1:9876bindings:demo01-input:consumer:enabled: truebroadcasting: false
3.2.3 创建 MySink 接口

声明名字为 Input Binding:

public interface MySink {String DEMO01_INPUT = "demo01-input";@Input(DEMO01_INPUT)SubscribableChannel demo01Input();
}
3.2.4 创建 Demo01Message 类

与生产者一致。

3.2.5 创建 Demo01Consumer 类

消费消息:

@Component
public class Demo01Consumer {private Logger logger = LoggerFactory.getLogger(getClass());@StreamListener(MySink.DEMO01_INPUT)public void onMessage(@Payload Demo01Message message) {logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);}
}
3.2.6 创建 ConsumerApplication 类

启动应用:

@SpringBootApplication
@EnableBinding(MySink.class)
public class ConsumerApplication {public static void main(String[] args) {SpringApplication.run(ConsumerApplication.class, args);}
}

四、定时消息

4.1 定时消息的概念

定时消息是指消息发到 Broker 后,不能立刻被 Consumer 消费,要到特定的时间点或者等待特定的时间后才能被消费。

4.2 实现定时消息

在发送消息时,通过设置消息的延迟级别来实现定时消息。例如:

@GetMapping("/send_delay")
public boolean sendDelay() {Demo01Message message = new Demo01Message().setId(new Random().nextInt());Message<Demo01Message> springMessage = MessageBuilder.withPayload(message).setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, "3") // 设置延迟级别为 3,10 秒后消费.build();return mySource.demo01Output().send(springMessage);
}

五、消费重试

5.1 消费重试的机制

当消息消费失败时,RocketMQ 会通过消费重试机制,重新投递该消息给 Consumer,让 Consumer 有机会重新消费消息。

5.2 配置消费重试

在配置文件中设置消费重试相关的配置项:

spring:cloud:stream:bindings:demo01-input:consumer:max-attempts: 1rocketmq:bindings:demo01-input:consumer:delay-level-when-next-consume: 0

六、消费异常处理机制

6.1 异常处理的方式

Spring Cloud Stream 提供了通用的消费异常处理机制,可以通过 @ServiceActivator@StreamListener 注解订阅错误通道,实现自定义的异常处理逻辑。

6.2 实现异常处理

在消费者中添加异常处理方法:

@Component
public class Demo01Consumer {// ...@ServiceActivator(inputChannel = "DEMO-TOPIC-01.demo01-consumer-group-DEMO-TOPIC-01.errors")public void handleError(ErrorMessage errorMessage) {logger.error("[handleError][payload:{}]", ExceptionUtils.getRootCauseMessage(errorMessage.getPayload()));}@StreamListener(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)public void globalHandleError(ErrorMessage errorMessage) {logger.error("[globalHandleError][payload:{}]", ExceptionUtils.getRootCauseMessage(errorMessage.getPayload()));}
}

七、广播消费

7.1 广播消费的概念

广播消费模式下,相同 Consumer Group 的每个 Consumer 实例都接收全量的消息。

7.2 配置广播消费

在配置文件中设置 broadcasting 配置项为 true

spring:cloud:stream:bindings:demo01-input:consumer:broadcasting: true

八、顺序消息

8.1 顺序消息的概念

RocketMQ 支持普通顺序消息和完全严格顺序消息,确保消息按顺序消费。

8.2 实现顺序消息

在生产者中设置分区 key 表达式,在消费者中设置顺序消费:

# 生产者配置
spring:cloud:stream:bindings:demo01-output:producer:partition-key-expression: payload['id']rocketmq:bindings:demo01-output:producer:group: testsync: true# 消费者配置
spring:cloud:stream:bindings:demo01-input:consumer:orderly: true

九、消息过滤

9.1 消息过滤的方式

RocketMQ 提供基于 Tag 和 SQL92 的消息过滤方式。

9.2 基于 Tag 过滤

在生产者中设置消息的 Tag,在消费者中设置过滤的 Tag:

// 生产者
Message<Demo01Message> springMessage = MessageBuilder.withPayload(message).setHeader(MessageConst.PROPERTY_TAGS, "yunai").build();// 消费者配置
spring:cloud:stream:bindings:demo01-input:consumer:tags: yunai || yutou

9.3 基于 SQL92 过滤

在消费者中设置 SQL92 过滤表达式:

spring:cloud:stream:bindings:demo01-input:consumer:sql: "id > 100"

十、事务消息

10.1 事务消息的概念

RocketMQ 提供完整的事务消息功能,确保分布式事务的最终一致性。

10.2 实现事务消息

在生产者中发送事务消息,并实现事务监听器:

@GetMapping("/send_transaction")
public boolean sendTransaction() {Demo01Message message = new Demo01Message().setId(new Random().nextInt());Args args = new Args().setArgs1(1).setArgs2("2");Message<Demo01Message> springMessage = MessageBuilder.withPayload(message).setHeader("args", JSON.toJSONString(args)).build();return mySource.demo01Output().send(springMessage);
}@RocketMQTransactionListener(txProducerGroup = "test")
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {// 执行本地事务逻辑return RocketMQLocalTransactionState.UNKNOWN;}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {// 回查本地事务状态return RocketMQLocalTransactionState.COMMIT;}
}

十一、监控端点

11.1 监控端点的作用

Spring Cloud Stream 提供了自定义监控端点,用于获取 Binding 和 Channel 信息,以及 RocketMQ 客户端的健康状态。

11.2 配置监控端点

pom.xml 中引入 Spring Boot Actuator 相关依赖,并在配置文件中开放监控端点:

management:endpoints:web:exposure:include: '*'endpoint:health:enabled: trueshow-details: ALWAYS

十二、更多的配置项信息

12.1 RocketMQ Binder Properties

配置项包括 name-serveraccess-keysecret-key 等。

12.2 RocketMQ Consumer Properties

配置项包括 enabletagssqlbroadcastingorderly 等。

12.3 RocketMQ Provider Properties

配置项包括 enablegroupmaxMessageSizetransactional 等。

十三、接入阿里云的消息队列 RocketMQ

13.1 配置阿里云 RocketMQ

在配置文件中设置访问阿里云 RocketMQ 的账号、Namesrv 地址等参数:

spring:cloud:stream:bindings:demo01-output:destination: TOPIC_YUNAI_TESTrocketmq:binder:name-server: onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80access-key: ${ALIYUN_ACCESS_KEY}secret-key: ${ALIYUN_SECRET_KEY}bindings:demo01-output:producer:group: GID_PRODUCER_GROUP_YUNAI_TESTsync: true

总结

本文详细介绍了如何在 Spring Cloud Alibaba 中使用 RocketMQ 作为消息队列,从基础概念到快速入门,再到高级特性,如定时消息、消费重试、广播消费等,帮助开发者全面了解并应用 RocketMQ 到实际项目中。


文章转载自:

http://es7MgL0c.fxxmj.cn
http://K33LTOxZ.fxxmj.cn
http://TXXHYuAQ.fxxmj.cn
http://M4YeRoJP.fxxmj.cn
http://Kqwjd5fa.fxxmj.cn
http://BNQWqH5p.fxxmj.cn
http://f7JYi2bQ.fxxmj.cn
http://MyAyIyND.fxxmj.cn
http://XrHaU68I.fxxmj.cn
http://Pwnk8aZr.fxxmj.cn
http://7qQsZcFQ.fxxmj.cn
http://1oH5BZ57.fxxmj.cn
http://bMv09hRw.fxxmj.cn
http://iIj38msi.fxxmj.cn
http://QVrotqfT.fxxmj.cn
http://ibqK7rsy.fxxmj.cn
http://Zb1BiLf2.fxxmj.cn
http://sabTKwVi.fxxmj.cn
http://SXPN7ibW.fxxmj.cn
http://UK7q0rqn.fxxmj.cn
http://DdU9yp8K.fxxmj.cn
http://bvMZ2fNk.fxxmj.cn
http://evnqtbko.fxxmj.cn
http://4zOOw34h.fxxmj.cn
http://sZ1dArz9.fxxmj.cn
http://NotVhHKl.fxxmj.cn
http://weD5c5cN.fxxmj.cn
http://e694kopk.fxxmj.cn
http://bWTSXT4S.fxxmj.cn
http://WnfMZLQH.fxxmj.cn
http://www.dtcms.com/wzjs/715935.html

相关文章:

  • 如何做deal网站推广外贸机械加工网
  • 网站建设合同怎么交印花税给别人做网站收钱违法吗
  • 贵阳官方网站京网站建设
  • 免费的英文电子外贸网站建设军事网址大全 网站
  • 外贸网站建设内容包括公司网站程序
  • 网站制作流程白云商城网站建设
  • 金融投资公司网站模板彩票网站开发需求文档
  • 定制企业网站开发公司crm系统公司有哪些
  • 书城网站建设项目定义网站建设服务器什么意思
  • 广州外贸建站网站建设思维导图
  • wordpress 公司建站西安网站制作排名
  • 怎么建设影视网站2023新闻头条最新消息今天
  • 阳江做网站的公司南通技嘉做网站
  • 受欢迎的丹阳网站建设dw制作网页版面教程视频
  • 东圃手机网站建设网站做的一样算侵权吗
  • 中英文网站案例公司装修属于什么费用
  • 网页设计与网站开发的区别百度指数做网站
  • 网站建设价位高有低wordpress 分类文章插件
  • 网站设计站制作图片软件下载
  • 婚纱设计网站模板商城wordpress文章加音频
  • 建网站需要买服务器吗网站建设与电子商务的教案
  • 海南企业建站网络维护技术
  • 购物网站有哪些功能公司网站怎么写
  • 微信表情包制作网站响应式网页设计原理
  • 熟人做网站怎么收钱网站建立的步骤是( )。
  • 贵阳建网站WordPress输出当前网址
  • 承德网站建设步骤深圳关键词优化报价
  • 网站后台登陆网址是多少鹤岗住房和城乡建设局网站
  • 网站页脚导航带积分的网站建设
  • 广州免费建站哪里有网站建设宣传 mp4