当前位置: 首页 > wzjs >正文

网站建设怎么用长尾做标题免费域名申请网站

网站建设怎么用长尾做标题,免费域名申请网站,wordpress网仿站,做网站的创始人依赖项和配置信息参见另一篇博文KafkaListener的配置使用,这里主要借助源码分析KafkaListener和KafkaTemplate自动装配原理。 1、KafkaAutoConfiguration 源码分析 KafkaAutoConfiguration类自动装配生成了生产者客户端KafkaTemplate的bean和消费者基础ConsumerFa…

依赖项和配置信息参见另一篇博文@KafkaListener的配置使用,这里主要借助源码分析@KafkaListener和KafkaTemplate自动装配原理。

1、KafkaAutoConfiguration 源码分析

KafkaAutoConfiguration类自动装配生成了生产者客户端KafkaTemplate的bean和消费者基础ConsumerFactory的bean,KafkaAutoConfiguration导入KafkaAnnotationDrivenConfiguration,KafkaAnnotationDrivenConfiguration最终生成了ConcurrentKafkaListenerContainerFactory的bean, 该bean是@KafkaListener默认使用的容器工厂,即指定了消费的kafka集群。

package org.springframework.boot.autoconfigure.kafka;import java.io.IOException;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.transaction.KafkaTransactionManager;@Configuration(proxyBeanMethods = false
)
/// KafkaTemplate在类路径下存在时,加载该配置类
@ConditionalOnClass({KafkaTemplate.class})  
@EnableConfigurationProperties({KafkaProperties.class})
/// 将KafkaAnnotationDrivenConfiguration、KafkaStreamsAnnotationDrivenConfiguration配置类合并过来
@Import({KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class})  
public class KafkaAutoConfiguration {private final KafkaProperties properties;public KafkaAutoConfiguration(KafkaProperties properties) {this.properties = properties;}@Bean  /// KafkaTemplate的bean未定义时,自动生成该bean@ConditionalOnMissingBean({KafkaTemplate.class})public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory, ProducerListener<Object, Object> kafkaProducerListener, ObjectProvider<RecordMessageConverter> messageConverter) {KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate(kafkaProducerFactory);messageConverter.ifUnique(kafkaTemplate::setMessageConverter);kafkaTemplate.setProducerListener(kafkaProducerListener);kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());return kafkaTemplate;}@Bean@ConditionalOnMissingBean({ProducerFactory.class})public ProducerFactory<?, ?> kafkaProducerFactory(ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory(this.properties.buildProducerProperties());String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();if (transactionIdPrefix != null) {factory.setTransactionIdPrefix(transactionIdPrefix);}customizers.orderedStream().forEach((customizer) -> {customizer.customize(factory);});return factory;}@Bean@ConditionalOnMissingBean({ProducerListener.class})public ProducerListener<Object, Object> kafkaProducerListener() {return new LoggingProducerListener();}@Bean/// ConsumerFactory的bean未定义时,自动生成该bean@ConditionalOnMissingBean({ConsumerFactory.class})public ConsumerFactory<?, ?> kafkaConsumerFactory(ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) {DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory(this.properties.buildConsumerProperties());customizers.orderedStream().forEach((customizer) -> {customizer.customize(factory);});return factory;}@Bean@ConditionalOnProperty(name = {"spring.kafka.producer.transaction-id-prefix"})@ConditionalOnMissingBeanpublic KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> producerFactory) {return new KafkaTransactionManager(producerFactory);}@Bean@ConditionalOnProperty(name = {"spring.kafka.jaas.enabled"})@ConditionalOnMissingBeanpublic KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException {KafkaJaasLoginModuleInitializer jaas = new KafkaJaasLoginModuleInitializer();KafkaProperties.Jaas jaasProperties = this.properties.getJaas();if (jaasProperties.getControlFlag() != null) {jaas.setControlFlag(jaasProperties.getControlFlag());}if (jaasProperties.getLoginModule() != null) {jaas.setLoginModule(jaasProperties.getLoginModule());}jaas.setOptions(jaasProperties.getOptions());return jaas;}@Bean@ConditionalOnMissingBeanpublic KafkaAdmin kafkaAdmin() {KafkaAdmin kafkaAdmin = new KafkaAdmin(this.properties.buildAdminProperties());kafkaAdmin.setFatalIfBrokerNotAvailable(this.properties.getAdmin().isFailFast());return kafkaAdmin;}
}

2、KafkaAnnotationDrivenConfiguration 源码分析

package org.springframework.boot.autoconfigure.kafka;import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener.Type;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerConfigUtils;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.BatchErrorHandler;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.listener.RecordInterceptor;
import org.springframework.kafka.support.converter.BatchMessageConverter;
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
import org.springframework.kafka.support.converter.MessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;/*** Configuration for Kafka annotation-driven support.** @author Gary Russell* @author Eddú Meléndez*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(EnableKafka.class)
class KafkaAnnotationDrivenConfiguration {private final KafkaProperties properties;private final RecordMessageConverter messageConverter;private final BatchMessageConverter batchMessageConverter;private final KafkaTemplate<Object, Object> kafkaTemplate;private final KafkaAwareTransactionManager<Object, Object> transactionManager;private final ConsumerAwareRebalanceListener rebalanceListener;private final ErrorHandler errorHandler;private final BatchErrorHandler batchErrorHandler;private final AfterRollbackProcessor<Object, Object> afterRollbackProcessor;private final RecordInterceptor<Object, Object> recordInterceptor;KafkaAnnotationDrivenConfiguration(KafkaProperties properties,ObjectProvider<RecordMessageConverter> messageConverter,ObjectProvider<BatchMessageConverter> batchMessageConverter,ObjectProvider<KafkaTemplate<Object, Object>> kafkaTemplate,ObjectProvider<KafkaAwareTransactionManager<Object, Object>> kafkaTransactionManager,ObjectProvider<ConsumerAwareRebalanceListener> rebalanceListener, ObjectProvider<ErrorHandler> errorHandler,ObjectProvider<BatchErrorHandler> batchErrorHandler,ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor,ObjectProvider<RecordInterceptor<Object, Object>> recordInterceptor) {this.properties = properties;this.messageConverter = messageConverter.getIfUnique();this.batchMessageConverter = batchMessageConverter.getIfUnique(() -> new BatchMessagingMessageConverter(this.messageConverter));this.kafkaTemplate = kafkaTemplate.getIfUnique();this.transactionManager = kafkaTransactionManager.getIfUnique();this.rebalanceListener = rebalanceListener.getIfUnique();this.errorHandler = errorHandler.getIfUnique();this.batchErrorHandler = batchErrorHandler.getIfUnique();this.afterRollbackProcessor = afterRollbackProcessor.getIfUnique();this.recordInterceptor = recordInterceptor.getIfUnique();}@Bean@ConditionalOnMissingBeanConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer();configurer.setKafkaProperties(this.properties);MessageConverter messageConverterToUse = (this.properties.getListener().getType().equals(Type.BATCH))? this.batchMessageConverter : this.messageConverter;configurer.setMessageConverter(messageConverterToUse);configurer.setReplyTemplate(this.kafkaTemplate);configurer.setTransactionManager(this.transactionManager);configurer.setRebalanceListener(this.rebalanceListener);configurer.setErrorHandler(this.errorHandler);configurer.setBatchErrorHandler(this.batchErrorHandler);configurer.setAfterRollbackProcessor(this.afterRollbackProcessor);configurer.setRecordInterceptor(this.recordInterceptor);return configurer;}@Bean@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")  /// ConcurrentKafkaListenerContainerFactory的bean未定义时,自动生成该bean(这是@KafkaListener默认使用的容器工厂)ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer,ConsumerFactory<Object, Object> kafkaConsumerFactory) {ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();configurer.configure(factory, kafkaConsumerFactory);return factory;}@Configuration(proxyBeanMethods = false)@EnableKafka@ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)static class EnableKafkaConfiguration {}}
http://www.dtcms.com/wzjs/82421.html

相关文章:

  • 在哪个网做免费网站好2023年百度小说风云榜
  • 展览 网站源码北京已感染上千万人
  • 做网站用什么浏览器最好爱站工具包下载
  • 网站维护更新热搜词排行榜关键词
  • 现在免费的外贸平台有哪些方法seo
  • 网站开发后端指什么恩城seo的网站
  • 电子商务网站开发与建设试卷宁波专业seo外包
  • 网页设置背景图片泰安网站建设优化
  • 天站网站建设网络营销与直播电商专业介绍
  • 找人做效果图那个网站网站友链
  • seo外贸网站建设网页快速收录
  • 搭建 网站 实例网站搭建费用
  • 郑州网站建设企业名录360优化大师软件
  • jsp和html做的招聘网站山东进一步优化
  • python做网站 jsp上google必须翻墙吗
  • 让其他公司做网站应注意什么常见的推广平台有哪些
  • 墨刀做网站手机怎么建网站
  • 东营网站建设收益高百度seo通科
  • 网页开发与网站开发chrome浏览器下载安卓手机
  • 可以做动画的网站官网seo关键词排名系统
  • 学生可做的网站主题宁德市属于哪个省
  • 用vps做网站的流程seo排名计费系统
  • 开发公司正式电未接通seo关键词快速排名
  • 哪些知名网站域名在国内注册在线外链
  • 用python做的网站多吗seo网站推广与优化方案
  • 哪个视频网站做直播销售免费友链互换
  • 中国购物网站大全排名漯河seo推广
  • 做批发各类新书的网站苏州seo排名优化课程
  • 管理手机网站模板郑州有没有厉害的seo顾问
  • 移动端网站如何开发快速开发平台