Spring AMQP源码解析
目录
channel和connection的区别
自动装配RabbitAutoConfiguration
消息发送流程
获取connection对象
获取channel对象
AMQConnection读取frame帧并回调publishconfirm和publishreturn
MainLoop线程监听
执行回调
channel和connection的区别
Spring AMQP 是 Spring 框架对 AMQP(高级消息队列协议)的支持,提供了一个高级抽象层,使得在 Spring 项目中使用消息队列变得更加方便。
在源码中会出现Channel和Connecttion的概念,我先来解释一下
TCP连接:TCP连接是传输层面上的连接,通常是通过IP地址和端口号建立的,RabbitMQ使用TCP协议进行网络通信,所有的消息传递都是在TCP连接上进行的。
RabbitMQ的连接:RabbitMQ的连接是指通过TCP建立的连接,通常是指Connection对,RabbitMQ在一个TCP连接上可以创建多个逻辑连接(即Channel)。
RabbitMQ的设计理念是尽量减少TCP连接的数量,推荐使用一个TCP连接来承载多个Channel,这种设计可以减少网络开销,提高性能,同时也简化了连接管理。
自动装配RabbitAutoConfiguration
@Configuration(proxyBeanMethods = false
)
@ConditionalOnClass({RabbitTemplate.class, Channel.class})
@EnableConfigurationProperties({RabbitProperties.class})
@Import({RabbitAnnotationDrivenConfiguration.class})
public class RabbitAutoConfiguration {@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnMissingBean({RabbitOperations.class})
public RabbitTemplate rabbitTemplate(RabbitProperties properties, ObjectProvider<MessageConverter> messageConverter, ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers, ConnectionFactory connectionFactory) {PropertyMapper map = PropertyMapper.get();RabbitTemplate template = new RabbitTemplate(connectionFactory);messageConverter.ifUnique(template::setMessageConverter);template.setMandatory(this.determineMandatoryFlag(properties));RabbitProperties.Template templateProperties = properties.getTemplate();if (templateProperties.getRetry().isEnabled()) {template.setRetryTemplate((new RetryTemplateFactory((List)retryTemplateCustomizers.orderedStream().collect(Collectors.toList()))).createRetryTemplate(templateProperties.getRetry(), Target.SENDER));}templateProperties.getClass();map.from(templateProperties::getReceiveTimeout).whenNonNull().as(Duration::toMillis).to(template::setReceiveTimeout);templateProperties.getClass();map.from(templateProperties::getReplyTimeout).whenNonNull().as(Duration::toMillis).to(template::setReplyTimeout);templateProperties.getClass();map.from(templateProperties::getExchange).to(template::setExchange);templateProperties.getClass();map.from(templateProperties::getRoutingKey).to(template::setRoutingKey);templateProperties.getClass();map.from(templateProperties::getDefaultReceiveQueue).whenNonNull().to(template::setDefaultReceiveQueue);return template;
}@Bean
public CachingConnectionFactory rabbitConnectionFactory(RabbitProperties properties, ObjectProvider<ConnectionNameStrategy> connectionNameStrategy) throws Exception {PropertyMapper map = PropertyMapper.get();CachingConnectionFactory factory = new CachingConnectionFactory((com.rabbitmq.client.ConnectionFactory)this.getRabbitConnectionFactoryBean(properties).getObject());properties.getClass();map.from(properties::determineAddresses).to(factory::setAddresses);properties.getClass();map.from(properties::isPublisherReturns).to(factory::setPublisherReturns);properties.getClass();map.from(properties::getPublisherConfirmType).whenNonNull().to(factory::setPublisherConfirmType);RabbitProperties.Cache.Channel channel = properties.getCache().getChannel();channel.getClass();map.from(channel::getSize).whenNonNull().to(factory::setChannelCacheSize);channel.getClass();map.from(channel::getCheckoutTimeout).whenNonNull().as(Duration::toMillis).to(factory::setChannelCheckoutTimeout);RabbitProperties.Cache.Connection connection = properties.getCache().getConnection();connection.getClass();map.from(connection::getMode).whenNonNull().to(factory::setCacheMode);connection.getClass();map.from(connection::getSize).whenNonNull().to(factory::setConnectionCacheSize);connectionNameStrategy.getClass();map.from(connectionNameStrategy::getIfUnique).whenNonNull().to(factory::setConnectionNameStrategy);return factory;
}}
@ConfigurationProperties(prefix = "spring.rabbitmq"
)
public class RabbitProperties {private String host = "localhost";private int port = 5672;private String username = "guest";private String password = "guest";private final Ssl ssl = new Ssl();private String virtualHost;private String addresses;@DurationUnit(ChronoUnit.SECONDS)private Duration requestedHeartbeat;private boolean publisherReturns;private CachingConnectionFactory.ConfirmType publisherConfirmType;private Duration connectionTimeout;private final Cache cache = new Cache();private final Listener listener = new Listener();private final Template template = new Template();private List<Address> parsedAddresses;}
这里就是spring自动装配的流程,其完整流程就是SpringBoot启动->@SpringBootApplication->@EnableAutoConfiguration->AutoConfigurationImportSelector扫描META-INF/spring.factories->加载RabbitAutoConfiguration->创建RabbitMQ相关Bean
RabbitProperties类加了@ConfigurationProperties会去读取配置文件中的参数,否则就提供类属性里面的默认配置,RabbitAutoConfiguration用@Bean注解通过方法将RabbitTemplate,CachingConnectionFactory都会注册成bean,在方法里面会注入RabbitProperties的bean给他们设置参数。
消息发送流程
protected void sendToRabbit(Channel channel, String exchange, String routingKey, boolean mandatory, Message message) throws IOException {AMQP.BasicProperties convertedMessageProperties = this.messagePropertiesConverter.fromMessageProperties(message.getMessageProperties(), this.encoding);channel.basicPublish(exchange, routingKey, mandatory, convertedMessageProperties, message.getBody());
}
public class Message implements Serializable {private static final long serialVersionUID = -7177590352110605597L;private static final String DEFAULT_ENCODING = Charset.defaultCharset().name();private static final Set<String> whiteListPatterns = new LinkedHashSet(Arrays.asList("java.util.*", "java.lang.*"));private static String bodyEncoding;private final MessageProperties messageProperties;private final byte[] body;public Message(byte[] body, MessageProperties messageProperties) {this.body = body;this.messageProperties = messageProperties;}}
在通过RabbitTemplate的convertAndSend()方法发送消息的时候,先判断是否定义了RetryTemplate,在RetryTemplate对象里面会定义重试的次数,间隔,RetryTemplate对象是否为null来判断是否要重试,如果开启了重试就调调用RetryTemplate的doexecute方法,并传入RecoveryCallback,用于处理所有重试失败后的逻辑,如果没有开启重试直接调用doExecute进行处理。
doExecute会通过CachingConnectionFactory来获取channel,最后通过dosend()方法发送消息。dosend方法先进行setupConfirm()方法再调用sendToRabbit发消息,RabbitTemplate内部会将消息包装成Message对象,通过Channel.basicPublish()方法发送消息,Message内部有一个byte字节数组封装要发送的消息,还有MessageProperties封装一些属性,像消息id,做一个防止重复消费消息
获取connection对象
public final Connection createConnection() throws AmqpException {if (this.stopped) {throw new AmqpApplicationContextClosedException("The ApplicationContext is closed and the ConnectionFactory can no longer create connections.");} else {synchronized(this.connectionMonitor) {if (this.cacheMode == CachingConnectionFactory.CacheMode.CHANNEL) {if (this.connection.target == null) {this.connection.target = super.createBareConnection();if (!this.checkoutPermits.containsKey(this.connection)) {this.checkoutPermits.put(this.connection, new Semaphore(this.channelCacheSize));}this.connection.closeNotified.set(false);this.getConnectionListener().onCreate(this.connection);}return this.connection;} else {return this.cacheMode == CachingConnectionFactory.CacheMode.CONNECTION ? this.connectionFromCache() : null;}}}
}
CachingConnectionFactory的createConnection方法创建连接,CachingConnectionFactory内部定义了一个Object对象作为锁connectionMonitor,在获取连接的时候会进行一个上锁的操作,判断采用的策略
获取channel对象
private Channel getChannel(ChannelCachingConnectionProxy connection, boolean transactional) {Semaphore permits = null;if (this.channelCheckoutTimeout > 0L) {permits = this.obtainPermits(connection);}LinkedList<ChannelProxy> channelList = this.determineChannelList(connection, transactional);ChannelProxy channel = null;if (connection.isOpen()) {channel = this.findOpenChannel(channelList, channel);if (channel != null && this.logger.isTraceEnabled()) {this.logger.trace("Found cached Rabbit Channel: " + channel.toString());}}if (channel == null) {try {channel = this.getCachedChannelProxy(connection, channelList, transactional);} catch (RuntimeException var7) {if (permits != null) {permits.release();if (this.logger.isDebugEnabled()) {this.logger.debug("Could not get channel; released permit for " + connection + ", remaining:" + permits.availablePermits());}}throw var7;}}return channel;
}
private ChannelProxy findOpenChannel(LinkedList<ChannelProxy> channelList, ChannelProxy channelArg) {ChannelProxy channel = channelArg;synchronized(channelList) {while(!channelList.isEmpty()) {channel = (ChannelProxy)channelList.removeFirst();if (this.logger.isTraceEnabled()) {this.logger.trace(channel + " retrieved from cache");}if (channel.isOpen()) {break;}this.cleanUpClosedChannel(channel);channel = null;}return channel;}
}
private ChannelProxy getCachedChannelProxy(ChannelCachingConnectionProxy connection, LinkedList<ChannelProxy> channelList, boolean transactional) {Channel targetChannel = this.createBareChannel(connection, transactional);if (this.logger.isDebugEnabled()) {this.logger.debug("Creating cached Rabbit Channel from " + targetChannel);}this.getChannelListener().onCreate(targetChannel, transactional);Class[] interfaces;if (!CachingConnectionFactory.ConfirmType.CORRELATED.equals(this.confirmType) && !this.publisherReturns) {interfaces = new Class[]{ChannelProxy.class};} else {interfaces = new Class[]{ChannelProxy.class, PublisherCallbackChannel.class};}return (ChannelProxy)Proxy.newProxyInstance(ChannelProxy.class.getClassLoader(), interfaces, new CachedChannelInvocationHandler(connection, targetChannel, channelList, transactional));
}
private Channel createBareChannel(ChannelCachingConnectionProxy connection, boolean transactional) {if (this.cacheMode == CachingConnectionFactory.CacheMode.CHANNEL) {// 检查连接是否断开if (!this.connection.isOpen()) {synchronized(this.connectionMonitor) {// 双重检查,确保在获取锁后连接仍然是关闭状态if (!this.connection.isOpen()) {// 通知连接关闭事件this.connection.notifyCloseIfNecessary();}// 再次检查并重建连接if (!this.connection.isOpen()) {// 清除旧连接this.connection.target = null;// 创建新连接this.createConnection();}}}
}return this.doCreateBareChannel(this.connection, transactional);} else if (this.cacheMode == CachingConnectionFactory.CacheMode.CONNECTION) {if (!connection.isOpen()) {synchronized(this.connectionMonitor) {((LinkedList)this.allocatedConnectionNonTransactionalChannels.get(connection)).clear();((LinkedList)this.allocatedConnectionTransactionalChannels.get(connection)).clear();connection.notifyCloseIfNecessary();this.refreshProxyConnection(connection);}}return this.doCreateBareChannel(connection, transactional);} else {return null;}
}
private Channel doCreateBareChannel(ChannelCachingConnectionProxy conn, boolean transactional) {Channel channel = conn.createBareChannel(transactional);if (!CachingConnectionFactory.ConfirmType.NONE.equals(this.confirmType)) {try {((Channel)channel).confirmSelect();} catch (IOException var5) {this.logger.error("Could not configure the channel to receive publisher confirms", var5);}}if ((CachingConnectionFactory.ConfirmType.CORRELATED.equals(this.confirmType) || this.publisherReturns) && !(channel instanceof PublisherCallbackChannelImpl)) {channel = this.publisherChannelFactory.createChannel((Channel)channel, this.getChannelsExecutor());}if (channel != null) {((Channel)channel).addShutdownListener(this);}return (Channel)channel;
}
public class ChannelN extends AMQChannel implements Channel {private static final String UNSPECIFIED_OUT_OF_BAND = "";private static final Logger LOGGER = LoggerFactory.getLogger(ChannelN.class);private final Map<String, Consumer> _consumers;private final Collection<ReturnListener> returnListeners;private final Collection<ConfirmListener> confirmListeners;}
CachingConnectionFactory调用getChannel()获取channel,会用一个map存储Connection,和一个 Semaphore来保证每个Connection的channel数,会再获取到存储channel的LinkList,实现Channel的复用。
获取LinkList中的连接会先上个锁,防止并发下产生问题。
如果从LinkList中获取的channel为null则通过getCachedChannelProxy()方法去获取channel的一个代理对象
getCachedChannelProxy()方法先通过createBareChannel方法获取一个targetchannel,会通过Jdk代理生成ChannelProxy对象,会判断有没有开启retrun机制,开启了代理实现相应的接口,CachingConnectionFactory内部定义了一个CachedChannelInvocationHandler
createBareChannel方法会先判断有没有创建connection对象,若是connection对象为null则采用了双重检验的方法,判断加锁再判断来防止多线程创建的问题,最后调用doCreateBareChannel方法创建channel,存在直接调用doCreateBareChannel。
doCreateBareChannel通过connection创建channel,再判断是否开启confirmType,publisherReturns,在channel接口的实现类会有returnListeners,confirmListeners存储。
AMQConnection读取frame帧并回调publishconfirm和publishreturn
MainLoop线程监听
public class AMQConnection {private final MainLoop mainLoop;private volatile ChannelManager _channelManager;// 连接启动时会启动MainLoop线程public void startMainLoop() {MainLoop loop = new MainLoop();String name = "AMQP Connection " + this.getHostAddress() + ":" + this.getPort();this.mainLoopThread = Environment.newThread(this.threadFactory, loop, name);this.mainLoopThread.start();
}
public Channel createChannel(int channelNumber) throws IOException {this.ensureIsOpen();ChannelManager cm = this._channelManager;if (cm == null) {return null;} else {Channel channel = cm.createChannel(this, channelNumber);this.metricsCollector.newChannel(channel);return channel;}
}// MainLoop是一个独立线程,负责从socket读取数据// 从socket读取AMQP帧private class MainLoop implements Runnable {private MainLoop() {}public void run() {boolean shouldDoFinalShutdown = true;try {while(AMQConnection.this._running) {Frame frame = AMQConnection.this._frameHandler.readFrame();AMQConnection.this.readFrame(frame);}} catch (Throwable var6) {if (var6 instanceof InterruptedException) {shouldDoFinalShutdown = false;} else {AMQConnection.this.handleFailure(var6);}} finally {if (shouldDoFinalShutdown) {AMQConnection.this.doFinalShutdown();}}}
}
private void readFrame(Frame frame) throws IOException {if (frame != null) {this._missedHeartbeats = 0;if (frame.type != 8) {if (frame.channel == 0) {this._channel0.handleFrame(frame);} else if (this.isOpen()) {ChannelManager cm = this._channelManager;if (cm != null) {ChannelN channel;try {channel = cm.getChannel(frame.channel);} catch (UnknownChannelException var5) {LOGGER.info("Received a frame on an unknown channel, ignoring it");return;}channel.handleFrame(frame);}}}} else {this.handleSocketTimeout();}}
}
public class ChannelManager {
private final Map<Integer, ChannelN> _channelMap;private ChannelN addNewChannel(AMQConnection connection, int channelNumber) {if (this._channelMap.containsKey(channelNumber)) {throw new IllegalStateException("We have attempted to create a channel with a number that is already in use. This should never happen. Please report this as a bug.");} else {ChannelN ch = this.instantiateChannel(connection, channelNumber, this.workService);this._channelMap.put(ch.getChannelNumber(), ch);return ch;}
}
}
AMQConnection是一些对frame帧,连接启动时会启动MainLoop线程,MainLoop是一个独立线程,负责从socket读取数据, 从socket读取AMQP帧,根据channel号找到对应的channel并处理帧,根据channel号找到对应的channel并处理帧,在AMQConnection 有个ChannelManager类型的对象,依靠他来管理创建的channel,保存在一个map里面,key为序列号,value是channel。
执行回调
public class PublisherCallbackChannelImpl implements PublisherCallbackChannel, ConfirmListener, ReturnListener, ShutdownListener {private static final MessagePropertiesConverter CONVERTER = new DefaultMessagePropertiesConverter();private static final long RETURN_CALLBACK_TIMEOUT = 60L;private final Log logger = LogFactory.getLog(this.getClass());private final Channel delegate;private final ConcurrentMap<String, PublisherCallbackChannel.Listener> listeners = new ConcurrentHashMap();private final Map<PublisherCallbackChannel.Listener, SortedMap<Long, PendingConfirm>> pendingConfirms = new ConcurrentHashMap();private final Map<String, PendingConfirm> pendingReturns = new ConcurrentHashMap();private final SortedMap<Long, PublisherCallbackChannel.Listener> listenerForSeq = new ConcurrentSkipListMap();}
public void handleAck(long seq, boolean multiple) {if (this.logger.isDebugEnabled()) {this.logger.debug(this.toString() + " PC:Ack:" + seq + ":" + multiple);}this.processAck(seq, true, multiple, true);
}public void handleNack(long seq, boolean multiple) {if (this.logger.isDebugEnabled()) {this.logger.debug(this.toString() + " PC:Nack:" + seq + ":" + multiple);}this.processAck(seq, false, multiple, true);
}
public synchronized void addPendingConfirm(PublisherCallbackChannel.Listener listener, long seq, PendingConfirm pendingConfirm) {SortedMap<Long, PendingConfirm> pendingConfirmsForListener = (SortedMap)this.pendingConfirms.get(listener);Assert.notNull(pendingConfirmsForListener, "Listener not registered: " + listener + " " + this.pendingConfirms.keySet());pendingConfirmsForListener.put(seq, pendingConfirm);this.listenerForSeq.put(seq, listener);if (pendingConfirm.getCorrelationData() != null) {String returnCorrelation = pendingConfirm.getCorrelationData().getId();if (StringUtils.hasText(returnCorrelation)) {this.pendingReturns.put(returnCorrelation, pendingConfirm);}}}
PublisherCallbackChannelImpl是channel的实现类,若是传递了correlationData会转化成PendingConfirm放到map里面,Listener是key,PendingConfirm是value的key。当broker确认消息后,会触发channel的confirm监听器,找到对应的CorrelationData,执行回调。