- 配置文件,微服务或者单体都可以在yml里配置,也可以写死在java文件.
spring:rabbitmq:addresses: 127.0.0.1:5672username: guestpassword: guestvirtual-host: /connection-timeout: 15000enable: truelistener:simple:acknowledge-mode: manualconcurrency: 15max-concurrency: 20
- 监听处理文件
package jzy.ziyuan.util.rabbitmq;import cn.hutool.json.JSONObject;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import jnpf.util.JsonUtil;
import jzy.model.mom.EventMessageMom;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;@Component
@ConditionalOnProperty(name = "spring.rabbitmq.enable", havingValue = "true", matchIfMissing = true)
public class MultiExchangeMessageListener {private static final Logger logger = LoggerFactory.getLogger(MultiExchangeMessageListener.class);@Autowiredprivate IdempotentMessageProcessor idempotentProcessor;@Autowiredprivate RabbitTemplate rabbitTemplate;@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "order.queue", durable = "true"), exchange = @Exchange(name = "20250917test1", type = ExchangeTypes.FANOUT) ))@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "user.queue", durable = "true"), @RabbitListener(queues = "#{tempQueue.name}") public void handleMessage(Message message) throws IOException {String messageId = message.getMessageProperties().getMessageId();String exchange = message.getMessageProperties().getReceivedExchange();long deliveryTag = message.getMessageProperties().getDeliveryTag();try {if (idempotentProcessor.isProcessed(messageId)) {logger.info("消息已处理,跳过重复消费: messageId={}, exchange={}", messageId, exchange);rabbitTemplate.getConnectionFactory().createConnection().createChannel(false).basicAck(deliveryTag, false);return;}String messageBody = new String(message.getBody(), "UTF-8");String processedMessage = replaceNbspWithSpace(messageBody);JSONObject jsonObject = JsonUtil.getJsonToBean(processedMessage, JSONObject.class);EventMessageMom eventMessage = JsonUtil.getJsonToBean(jsonObject, EventMessageMom.class);logger.info("接收来自交换机的消息: exchange={}, messageId={}, content={}",exchange, messageId, eventMessage);switch (exchange) {default:break;}idempotentProcessor.markAsProcessed(messageId);rabbitTemplate.getConnectionFactory().createConnection().createChannel(false).basicAck(deliveryTag, false);logger.info("消息处理成功: messageId={}, exchange={}", messageId, exchange);} catch (Exception e) {logger.error("消息处理失败: messageId={}, exchange={}", messageId, exchange, e);rabbitTemplate.getConnectionFactory().createConnection().createChannel(false).basicNack(deliveryTag, false, false); }}public static String replaceNbspWithSpace(String str) {if (str == null) {return null;}return str.replaceAll("\\u00A0", " ");}
}
- rabbitmq配置文件
package jzy.ziyuan.util.rabbitmq;import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class MultiExchangeRabbitConfig {public static final String[] EXCHANGE_NAMES = {"exchange.20250918logs","exchange.20250918events","exchange.20250918notifications"};public static final String EXCHANGE_TYPE = "fanout";public static final String DLX_EXCHANGE = "20250918exchange.dlx";public static final String DLX_QUEUE = "20250918queue.dlx";@Beanpublic DirectExchange dlxExchange() {return ExchangeBuilder.directExchange(DLX_EXCHANGE).durable(true).build();}@Beanpublic Queue dlxQueue() {return QueueBuilder.durable(DLX_QUEUE).build();}@Beanpublic Binding dlxBinding() {return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dlx.routing.key");}@Beanpublic Queue tempQueue() {Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", DLX_EXCHANGE);args.put("x-dead-letter-routing-key", "dlx.routing.key");return QueueBuilder.nonDurable() .exclusive() .autoDelete() .withArguments(args).build();}@Beanpublic Declarables exchangeBindings() {Declarables declarables = new Declarables();for (String exchangeName : EXCHANGE_NAMES) {FanoutExchange exchange = ExchangeBuilder.fanoutExchange(exchangeName).durable(true).build();declarables.getDeclarables().add(exchange);Binding binding = BindingBuilder.bind(tempQueue()).to(exchange);declarables.getDeclarables().add(binding);}return declarables;}@Beanpublic RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); factory.setPrefetchCount(10); factory.setConcurrentConsumers(3); factory.setMaxConcurrentConsumers(5); factory.setRecoveryInterval(5000L); return factory;}
}