当前位置: 首页 > wzjs >正文

专业房产网站建设公司排名本溪seo优化

专业房产网站建设公司排名,本溪seo优化,做网站多少钱啊,大连雄猫网络推广有限公司依赖项和配置信息参见另一篇博文KafkaListener的配置使用,这里主要借助源码分析KafkaListener和KafkaTemplate自动装配原理。 1、KafkaAutoConfiguration 源码分析 KafkaAutoConfiguration类自动装配生成了生产者客户端KafkaTemplate的bean和消费者基础ConsumerFa…

依赖项和配置信息参见另一篇博文@KafkaListener的配置使用,这里主要借助源码分析@KafkaListener和KafkaTemplate自动装配原理。

1、KafkaAutoConfiguration 源码分析

KafkaAutoConfiguration类自动装配生成了生产者客户端KafkaTemplate的bean和消费者基础ConsumerFactory的bean,KafkaAutoConfiguration导入KafkaAnnotationDrivenConfiguration,KafkaAnnotationDrivenConfiguration最终生成了ConcurrentKafkaListenerContainerFactory的bean, 该bean是@KafkaListener默认使用的容器工厂,即指定了消费的kafka集群。

package org.springframework.boot.autoconfigure.kafka;import java.io.IOException;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.transaction.KafkaTransactionManager;@Configuration(proxyBeanMethods = false
)
/// KafkaTemplate在类路径下存在时,加载该配置类
@ConditionalOnClass({KafkaTemplate.class})  
@EnableConfigurationProperties({KafkaProperties.class})
/// 将KafkaAnnotationDrivenConfiguration、KafkaStreamsAnnotationDrivenConfiguration配置类合并过来
@Import({KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class})  
public class KafkaAutoConfiguration {private final KafkaProperties properties;public KafkaAutoConfiguration(KafkaProperties properties) {this.properties = properties;}@Bean  /// KafkaTemplate的bean未定义时,自动生成该bean@ConditionalOnMissingBean({KafkaTemplate.class})public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory, ProducerListener<Object, Object> kafkaProducerListener, ObjectProvider<RecordMessageConverter> messageConverter) {KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate(kafkaProducerFactory);messageConverter.ifUnique(kafkaTemplate::setMessageConverter);kafkaTemplate.setProducerListener(kafkaProducerListener);kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());return kafkaTemplate;}@Bean@ConditionalOnMissingBean({ProducerFactory.class})public ProducerFactory<?, ?> kafkaProducerFactory(ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory(this.properties.buildProducerProperties());String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();if (transactionIdPrefix != null) {factory.setTransactionIdPrefix(transactionIdPrefix);}customizers.orderedStream().forEach((customizer) -> {customizer.customize(factory);});return factory;}@Bean@ConditionalOnMissingBean({ProducerListener.class})public ProducerListener<Object, Object> kafkaProducerListener() {return new LoggingProducerListener();}@Bean/// ConsumerFactory的bean未定义时,自动生成该bean@ConditionalOnMissingBean({ConsumerFactory.class})public ConsumerFactory<?, ?> kafkaConsumerFactory(ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) {DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory(this.properties.buildConsumerProperties());customizers.orderedStream().forEach((customizer) -> {customizer.customize(factory);});return factory;}@Bean@ConditionalOnProperty(name = {"spring.kafka.producer.transaction-id-prefix"})@ConditionalOnMissingBeanpublic KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> producerFactory) {return new KafkaTransactionManager(producerFactory);}@Bean@ConditionalOnProperty(name = {"spring.kafka.jaas.enabled"})@ConditionalOnMissingBeanpublic KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException {KafkaJaasLoginModuleInitializer jaas = new KafkaJaasLoginModuleInitializer();KafkaProperties.Jaas jaasProperties = this.properties.getJaas();if (jaasProperties.getControlFlag() != null) {jaas.setControlFlag(jaasProperties.getControlFlag());}if (jaasProperties.getLoginModule() != null) {jaas.setLoginModule(jaasProperties.getLoginModule());}jaas.setOptions(jaasProperties.getOptions());return jaas;}@Bean@ConditionalOnMissingBeanpublic KafkaAdmin kafkaAdmin() {KafkaAdmin kafkaAdmin = new KafkaAdmin(this.properties.buildAdminProperties());kafkaAdmin.setFatalIfBrokerNotAvailable(this.properties.getAdmin().isFailFast());return kafkaAdmin;}
}

2、KafkaAnnotationDrivenConfiguration 源码分析

package org.springframework.boot.autoconfigure.kafka;import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener.Type;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerConfigUtils;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.BatchErrorHandler;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.listener.RecordInterceptor;
import org.springframework.kafka.support.converter.BatchMessageConverter;
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
import org.springframework.kafka.support.converter.MessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;/*** Configuration for Kafka annotation-driven support.** @author Gary Russell* @author Eddú Meléndez*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(EnableKafka.class)
class KafkaAnnotationDrivenConfiguration {private final KafkaProperties properties;private final RecordMessageConverter messageConverter;private final BatchMessageConverter batchMessageConverter;private final KafkaTemplate<Object, Object> kafkaTemplate;private final KafkaAwareTransactionManager<Object, Object> transactionManager;private final ConsumerAwareRebalanceListener rebalanceListener;private final ErrorHandler errorHandler;private final BatchErrorHandler batchErrorHandler;private final AfterRollbackProcessor<Object, Object> afterRollbackProcessor;private final RecordInterceptor<Object, Object> recordInterceptor;KafkaAnnotationDrivenConfiguration(KafkaProperties properties,ObjectProvider<RecordMessageConverter> messageConverter,ObjectProvider<BatchMessageConverter> batchMessageConverter,ObjectProvider<KafkaTemplate<Object, Object>> kafkaTemplate,ObjectProvider<KafkaAwareTransactionManager<Object, Object>> kafkaTransactionManager,ObjectProvider<ConsumerAwareRebalanceListener> rebalanceListener, ObjectProvider<ErrorHandler> errorHandler,ObjectProvider<BatchErrorHandler> batchErrorHandler,ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor,ObjectProvider<RecordInterceptor<Object, Object>> recordInterceptor) {this.properties = properties;this.messageConverter = messageConverter.getIfUnique();this.batchMessageConverter = batchMessageConverter.getIfUnique(() -> new BatchMessagingMessageConverter(this.messageConverter));this.kafkaTemplate = kafkaTemplate.getIfUnique();this.transactionManager = kafkaTransactionManager.getIfUnique();this.rebalanceListener = rebalanceListener.getIfUnique();this.errorHandler = errorHandler.getIfUnique();this.batchErrorHandler = batchErrorHandler.getIfUnique();this.afterRollbackProcessor = afterRollbackProcessor.getIfUnique();this.recordInterceptor = recordInterceptor.getIfUnique();}@Bean@ConditionalOnMissingBeanConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer();configurer.setKafkaProperties(this.properties);MessageConverter messageConverterToUse = (this.properties.getListener().getType().equals(Type.BATCH))? this.batchMessageConverter : this.messageConverter;configurer.setMessageConverter(messageConverterToUse);configurer.setReplyTemplate(this.kafkaTemplate);configurer.setTransactionManager(this.transactionManager);configurer.setRebalanceListener(this.rebalanceListener);configurer.setErrorHandler(this.errorHandler);configurer.setBatchErrorHandler(this.batchErrorHandler);configurer.setAfterRollbackProcessor(this.afterRollbackProcessor);configurer.setRecordInterceptor(this.recordInterceptor);return configurer;}@Bean@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")  /// ConcurrentKafkaListenerContainerFactory的bean未定义时,自动生成该bean(这是@KafkaListener默认使用的容器工厂)ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer,ConsumerFactory<Object, Object> kafkaConsumerFactory) {ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();configurer.configure(factory, kafkaConsumerFactory);return factory;}@Configuration(proxyBeanMethods = false)@EnableKafka@ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)static class EnableKafkaConfiguration {}}
http://www.dtcms.com/wzjs/425348.html

相关文章:

  • wordpress输入密码查看内容网站seo是什么意思
  • 企业网站必须备案app拉新推广平台有哪些
  • app投放渠道有哪些sem优化软件哪家好
  • 聊城网站建设价位自己怎么创建网站
  • 做网站怎么看效果游戏推广公司靠谱吗
  • 郑州做企业网站营销qq
  • 金华网站建设yw126短视频拍摄剪辑培训班
  • 企业网站要怎么做免费男女打扑克的软件
  • wordpress导航菜单特效seo推广关键词公司
  • 设计公司门头设计抖音seo推荐算法
  • 专卖二手手表网站芜湖网络营销公司
  • 百度统计搜索词为什么有与网站不相关的词推广的渠道和方法有哪些
  • 农业网站建设关键词排名监控批量查询
  • 国内外电子政务网站建设差距购买友情链接网站
  • 公司网站设计主页部分怎么做南昌seo网站排名
  • 深圳专业网站建设公司关键词seo如何优化
  • 乐清门户网站百度排名点击
  • 网站开发研究现状百度客服人工服务电话
  • 技术支持 东莞网站建设舞蹈培训seo和竞价排名的区别
  • 互联网100个创业项目名称武汉久都seo
  • 普通电脑如何做网站服务器吗深圳百度推广优化
  • 兰州网站建设北京如何优化搜索引擎
  • 政府采购网上商城电商网站seo最新优化方法
  • 网站定制合同广东近期新闻
  • wordpress 点击 代码新手如何学seo
  • 定制相册哪个网站好seo线下培训机构
  • 公司自己做网站appstore关键词优化
  • 网站建设新闻 常识外贸网络推广营销
  • 刷网站排名 优帮云苹果看国外新闻的app
  • 结构设计在哪个网站接单兼职做网络公司网页设计