当前位置: 首页 > wzjs >正文

包头网站建设制作好搜自然seo

包头网站建设制作,好搜自然seo,多店铺商城系统开源,电信宽带做网站服务器系列博客专栏: JVM系列博客专栏SpringBoot系列博客 Spring Boot 2.2.1 集成 RabbitMQ 实现高效流量控制 在分布式系统中,消息队列是实现异步通信、解耦服务的重要组件。RabbitMQ 作为一款成熟的开源消息队列,广泛应用于各类项目中。本文将…

系列博客专栏:

  • JVM系列博客专栏
  • SpringBoot系列博客

Spring Boot 2.2.1 集成 RabbitMQ 实现高效流量控制

在分布式系统中,消息队列是实现异步通信、解耦服务的重要组件。RabbitMQ 作为一款成熟的开源消息队列,广泛应用于各类项目中。本文将结合 Spring Boot 2.2.1,详细介绍如何集成 RabbitMQ 并实现基于队列长度、内存和磁盘的流量控制,同时引入服务端限流配置,进一步提升系统的稳定性与可靠性。

一、RabbitMQ 流量控制的重要性

当消息产生速度过快,超过消息队列的处理能力时,可能会导致队列积压、系统性能下降甚至崩溃。通过流量控制,可以有效限制消息的流入速度,使系统能够在合理的负载下运行,保障服务的稳定性和可靠性。

二、Spring Boot 2.2.1 集成 RabbitMQ 基础配置

1. 引入依赖

pom.xml 文件中添加 Spring Boot AMQP 和 Web 依赖:

<dependencies><!-- Spring Boot Starter AMQP --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- Spring Boot Starter Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- JSON处理依赖 --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency><!-- Spring Boot Starter Test --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- RabbitMQ测试依赖 --><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency>
</dependencies>

2. 配置文件

application.yml 中配置 RabbitMQ 连接信息和相关参数:

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestvirtual-host: /requested-heartbeat: 30connection-timeout: 10000publisher-confirms: truepublisher-returns: truelistener:simple:acknowledge-mode: autoprefetch: 50concurrency: 3max-concurrency: 10cache:channel:size: 50checkout-timeout: 30000connection:mode: CHANNELsize: 5# 自定义流量控制配置
app:flow-control:max-messages: 1000duration: 5000

3. RabbitMQ 配置类

创建 RabbitMQConfig 类,配置队列、交换机、绑定关系、消息转换器以及 RabbitTemplate:

package com.example.springboot.rabbitmq.configuration;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
@Slf4j
public class RabbitMQConfig {public static final String QUEUE_NAME = "flow.control.queue";public static final String EXCHANGE_NAME = "flow.control.exchange";public static final String ROUTING_KEY = "flow.control.key";// 配置队列@Beanpublic Queue queue() {return QueueBuilder.durable(QUEUE_NAME).maxLength(1000).build();}// 配置交换机@Beanpublic DirectExchange exchange() {return new DirectExchange(EXCHANGE_NAME);}// 绑定队列和交换机@Beanpublic Binding binding(Queue queue, DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);}// 配置消息转换器@Beanpublic Jackson2JsonMessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}// 配置RabbitTemplate@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,Jackson2JsonMessageConverter messageConverter) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(messageConverter);// 设置mandatory标志,确保消息在无法路由时返回rabbitTemplate.setMandatory(true);// 设置发布确认回调rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {log.info("消息发送成功: {}",  correlationData);} else {log.warn("消息发送失败: {}",  cause);}});// 设置返回回调rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {log.info("消息被退回: {}", new String(message.getBody()));log.info("回复码: ", replyCode);log.info("回复文本: ", replyText);log.info("交换机: ", exchange);log.info("路由键: ", routingKey);});return rabbitTemplate;}// 配置监听器容器工厂@Beanpublic SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory,Jackson2JsonMessageConverter messageConverter) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(messageConverter);factory.setConcurrentConsumers(3); // 设置并发消费者数量factory.setMaxConcurrentConsumers(10);factory.setPrefetchCount(50); // 设置 QoSfactory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 手动确认模式return factory;}
}

三、基于队列长度的流量控制

MessageProducer 类中实现基于队列长度的流量控制逻辑:

package com.example.demo.service;import com.example.demo.config.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;@Service
public class MessageProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;private final AtomicInteger messageCount = new AtomicInteger(0);private static final int MAX_MESSAGES = 1000;private volatile boolean flowControlEnabled = false;public void sendMessage(String message) {if (flowControlEnabled) {System.out.println("流量控制已启用,暂停发送消息");return;}if (messageCount.get() >= MAX_MESSAGES) {System.out.println("达到最大消息数量,触发流量控制");enableFlowControl(5000);return;}String correlationId = UUID.randomUUID().toString();CorrelationData correlationData = new CorrelationData(correlationId);rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,RabbitMQConfig.ROUTING_KEY,message,correlationData);messageCount.incrementAndGet();System.out.println("发送消息: " + message + ", 消息ID: " + correlationId);}public void enableFlowControl(long durationMillis) {flowControlEnabled = true;System.out.println("流量控制已启用,持续时间: " + durationMillis + "ms");new Thread(() -> {try {Thread.sleep(durationMillis);} catch (InterruptedException e) {Thread.currentThread().interrupt();}flowControlEnabled = false;messageCount.set(0);System.out.println("流量控制已禁用");}).start();}
}

除了用代码限制外,可以用maxLength设置,示例代码:

 // 配置队列@Beanpublic Queue queue() {return QueueBuilder.durable(QUEUE_NAME).maxLength(1000).build();}

四、x-max-length-bytes 参数详解

x-max-length-bytes 用于限制队列中消息的总字节数。在创建队列时,可以通过代码配置:

@Bean
public Queue queue() {return QueueBuilder.durable(QUEUE_NAME).maxLength(1000).maxLengthBytes(1024 * 1024 * 10) // 设置队列消息总字节数上限为10MB.build();
}

当队列中消息的总字节数达到设定的阈值时,后续新消息的处理策略由 x-overflow 参数决定:

  • drop-head:丢弃队列头部的消息,为新消息腾出空间。
  • reject-publish:拒绝接收新消息,并向生产者返回 Basic.Reject 响应。

五、基于内存和磁盘的流量控制

通过配置 RabbitMQ 服务器的内存和磁盘告警阈值,当服务器内存使用或磁盘空间达到阈值时,会自动触发流量控制。例如:

rabbitmqctl set_vm_memory_high_watermark 0.6

此命令将内存高水位线设置为系统内存的 60%。

六、服务端限流配置

1. 基于 Guava 的限流实现

添加 Guava 依赖:

<dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>28.2-jre</version>
</dependency>

使用 RateLimiter 进行限流:

package com.example.demo.service;import com.google.common.util.concurrent.RateLimiter;
import org.springframework.stereotype.Service;@Service
public class LimitedService {private final RateLimiter rateLimiter = RateLimiter.create(5);public void limitedMethod() {if (rateLimiter.tryAcquire()) {System.out.println("请求被处理");} else {System.out.println("请求被限流");}}
}

七、 消费端限流

默认情况下,如果不进行配置,RabbitMQ会尽可能快速地把队列中的消息发送到消费者。如果消息数量过多,可能会导致OOM或者影响其他进程的正常运行

1. 消费端限流示例

package com.example.springboot.rabbitmq.service;import com.example.springboot.rabbitmq.configuration.RabbitMQConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Service;import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;@Service
@Slf4j
public class MessageConsumer {@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)@Retryable(value = {IOException.class}, maxAttempts = 3,backoff = @Backoff(delay = 2000, multiplier = 2))public void receiveMessage(Message message, Channel channel) throws IOException {try {if (channel == null || !channel.isOpen()) {log.warn("Channel is closed or null, unable to process message");return;}// 动态设置预取计数channel.basicQos(calculatePrefetchCount());String content = new String(message.getBody());log.info("接收到消息:{} ", content);// 模拟消息处理时间Thread.sleep(100);// 发送消息确认channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);log.info("消息处理完成");} catch (Exception e) {log.error("处理消息时发生错误: {}", e.getMessage(), e);if (channel != null && channel.isOpen()) {channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // 失败后重新入队}}}// 根据系统负载动态计算预取计数private int calculatePrefetchCount() {double cpuLoad = getSystemCpuLoad();int basePrefetch = 10;return (int) Math.max(1, basePrefetch * (1 - cpuLoad));}// 获取当前系统 CPU 负载private double getSystemCpuLoad() {OperatingSystemMXBean osBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();return osBean.getSystemLoadAverage() / osBean.getAvailableProcessors();}}

八、总结

通过上述配置和代码示例,您可以实现对 RabbitMQ 的高效流量控制,从而提升系统的稳定性和可靠性。合理利用队列长度限制、内存和磁盘流量控制,以及服务端限流策略,可以帮助系统在高负载情况下保持良好的运行状态。

http://www.dtcms.com/wzjs/107037.html

相关文章:

  • 美女做暖暖免费视频2017网站深圳seo优化外包
  • 为什么做的网站打开自动缩放班级优化大师下载安装
  • 做第三方的qq互联接口时_回调到自己的网站时要延时很久是什么原因徐州seo排名收费
  • 企业网站建设兴田德润实惠哈尔滨seo关键词排名
  • wordpress如何自建站成人电脑培训班办公软件
  • 高端营销型网站建设品牌常州网站建设书生商友
  • 从哪些方面评价一个企业的网站建设seo入门版
  • 黄金app软件下载大全免费抖音seo排名
  • 织梦电子行业网站模板seo网站排名优化软件
  • 河北网站建站制作优化公司流程制度
  • 做网站必须花钱吗seo优化排名是什么
  • 基本的网站建设知识青岛做网站的公司哪家好
  • 长沙疫情最新情况2024年百度seo排名点击器
  • 做衣服的网站百度安装应用
  • git做网站根目录百度小说搜索排行榜
  • 用html框架做网站广告优化师发展前景
  • 申请注册邮箱衡阳网站优化公司
  • 学做网站难不难千锋教育和黑马哪个好
  • 做网站和做网页有什么区别网站建设制作公司
  • 企业网站建设排名桂林网页
  • 怎么做网页截图安徽百度seo公司
  • 网站开发与客户交流最新热点新闻事件
  • 网站开发 案例seo优化裤子关键词
  • 西安知名的集团门户网站建设公司淘宝的关键词排名怎么查
  • 泰安网站建设哪家快英文外链seo兼职在哪里找
  • 基于p2p的网站建设搜索引擎推广简称
  • 贵州住房建设厅网站优化大师手机版下载
  • 苏州网站怎么做seo站长
  • 沈阳网站建设工作室seo百度快速排名
  • 尉氏网站建设开发app需要多少资金