当前位置: 首页 > wzjs >正文

网站建设就业方向王通seo

网站建设就业方向,王通seo,浙江省政府加强政府网站建设方案,深圳高端网站建设电话kafka自定义了一套网络协议,遵守这个协议的格式,就可以像kafka推送消息或者拉取消息 kafka生产者主要实现的功能: 同步/异步发送消息 批量发送消息 超时重发 生产者代码步骤 1.明确发送时同步还是异步;2.明确kafka服务端的主机…

kafka自定义了一套网络协议,遵守这个协议的格式,就可以像kafka推送消息或者拉取消息 kafka生产者主要实现的功能:

  • 同步/异步发送消息

  • 批量发送消息

  • 超时重发

生产者代码步骤
  • 1.明确发送时同步还是异步;2.明确kafka服务端的主机名和端口号;3.明确发送的topic;4.循环分析:要是异步发送,生产者发送消息给kafka服务端,然后忙自己的事,收到服务端发来的ack确认消息后,会调用回调函数;同步机制:生产者发送消息给kafka服务端,阻塞等待,收到服务端发来的ack确认消息后,会调用回调函数

  • 具体过程:消息发送过程,涉及两个线程协同工作

  • 主线程:将数据封装成ProducerRecord对象,调用send()将消息存在RecordAccumulator中

  • Sender线程:从 RecordAccumulator 中不断拉取数据并发送到 Kafka,并进行响应处理 从RecordAccoumulator中批量取出消息(都是同一topic+分区)构成ProducerBatch,取出多个ProducerBatch封装成ClientRequest,将ClientRequest交给NetworkClient,NetworkClient将请求通过kafkachannel执行I/O线程批量发送给broker,收到响应后,调用ClientRequest的回调函数

kafkaProducer
  • 实现了四个方法

//1
send()将消息存入RecordAccumulator
//2
flush(),等待RecordAccumulator中的消息发送完全
//3
partitionsFor(),从Metadata(存储kafka集群的元信息)中获取制定Topic的分区信息
//4
close()关闭Producer对象,等待RecordAccumulator清空,关闭Sender线程
  • 重要字段解释(cv)

PRODUCER_CLIENT_ID_SEQUENCE:clientId的生成器
clientId:此生产者的唯一标识
partitioner:分区选择器
maxRequestSize:消息的最大长度,包含消息头
totalMemorySize:发送单个消息的缓冲区大小
accumulator:RecordAccumulator
sender:发送消息的Sender任务
ioThread:执行Sender任务发送消息的线程
compressionType:压缩算法
keySerializer:key的序列化器
valueSerializer:value的序列化器
Metadata metadata:整个Kafka集群的元数据
maxBlockTimeMs:等待更新Kafka集群元数据的最大时长
requestTimeoutMs:从消息发送到收到ACK响应的最长时长
interceptors:ProducerInterceptor集合,ProducerInterceptor可以在消息发送之前对其进行拦截或修改;也可以先于用户的Callback,对ACK响应进行预处理producerConfig:配置对象,使用反射初始化KafkaProducer配置的相对对象
  • 为什么要在kafkaProducer中维护kafka集群的元信息

  • 在创建ProducerRecord时,只需指定topic的名称,那么他是怎样去特定的分区的,那么就是元信息中存储着这个topic对应几个分区,经过特定算法可以知道目标分区,那么最终时需要知道目标分区的leader所在服务器的地址,端口,这些信息也存在元信息里面

  • 元信息具体数据(可以理解为通过结构体存储信息)

//Node表示一个集群的节点broker,Node记录这个节点host,ip,port
type Node struct {ID   intHost stringPort int...
}
//TopicPartition表示某个topic的分区,里面有topic名称,这个分区在这个topic的编号id
type TopicPartition struct {Topic     stringPartition int...
}
//PartitionInfo:分区详细信息,里面有topic string,partition int[和上面一样],还有leader Node[leader所在节点的id],replicas Node[]-所有副本所在的节点信息,inSyncReplicas-isr集合中所有节点信息
type PartitionInfo struct {Topic           stringPartition       intLeader          NodeReplicas        []NodeInSyncReplicas  []Node...
}
  • 这些信息都包含在cluster集群中,这个cluster核心字段如下:

  • 1.nodes:kafka集群中节点列表([]Node)

  • 2.nodesById: BrokerId<->Node (map(int,Node))

  • 3.partitionByTopicPartition:TopicPartition<->PartitionInfo(map(TopicPartition,PartitionInfo))

  • 4.partitionsByTopic:Topic名字<->PartitionInfo(map(string,[]PartitionInfo))

  • 5.avaliablePartitionsByTopic:Topic名字<->PartitionInfo(map(string,[]PartitionInfo))

  • partitionsByTopic和avaliablePartitionsByTopic的区别是上面那个可以没有leader【在某些中间状态,leader宕机状态】,下面那个必须有leader

  • 6.partitionsByNode:Node<->Partitioninfo(map(int,[]PartitionInfo))

这些怎样用呢?1.查找所有broker的信息;2.通过broker的id找到他的Node信息;3.通过TopicPartition找到更加具体的PartitionInfo;4.找到topic下面的所有分区信息(可以是中间状态);5.找到topic下面的所有分区信息(必须有leader);6.找到broker这个下面的所有分区信息
Metadata封装了cluster,核心字段如下
  • topics:所有topic

  • version:cluseter集群版本号,每更新一次集群就会使他+1

  • metadaExpireMs:更新集群元信息间隔

  • refreshBackoffMs:两次更新集群元信息最小间隔

  • lastRefreshMa:上次更新元数据时间(失败和成功都包含)

  • lastSuccessfulRefreshMs:上次元数据更新成功时间

  • cluster:kafka集群元数据记录

  • needUpdate:是否强制更新元数据-还不太懂

  • listeners:监听Metadata更新的监听器集合-还不太懂

  • needMetadaForTopics:是否更新全部的topic的元数据


Metadata核心方法
//将needUpdate更新为true,目的:Sender线程运行时更新Metadata记录的元数据,然会version的值
requestUpdate() 
//通过version版本号来判断元数据是否更新成功,要是没有就阻塞 
awaitUpdate()
Metadata中的字段由主线程读取,Sender线程负责更新,因此必须要保证线程安全性,因此一般都要给他配置sync
waitOnMetadata()函数分析
  • 看自己需要的topic是否在Metadata里面,要是没在,就加进去

  • 获取topic中分区的详细信息,要是失败,调用requestUpdate(),唤醒Sender线程,使得Sender线程更新Metadata记录的元数据

  • 主线程调用awaitUpdate(),等待Sender线程更新元数据成功

  • 回到第二步直到获取partitioninfo分区信息成功

http://www.dtcms.com/wzjs/141371.html

相关文章:

  • 武汉网站建设哪家强抖音视频seo霸屏
  • 珠宝网站制作的理念外包seo公司
  • 网站页脚怎么做能好看点百度怎么发布自己的广告
  • 大学网站开发模板免费下载网推公司干什么的
  • 企业做网站哪个最好sem优化技巧
  • 企业网站建设的思路游戏推广赚佣金
  • 网站开发公司分析厦门seo网络推广
  • vs2017做网站上海网站seo快速排名
  • 网络服务通知标题优化怎么做
  • 装饰网站模版怎样把个人介绍放到百度
  • 在西部数码做的企业网站不能与阿里巴巴网站相连接怎样做网站推广啊
  • 微网站需要什么百度我的订单
  • 网站头部优化文字怎么做国际新闻消息
  • 做网上购物网站网页制作html代码
  • 最近军事新闻正规网站优化哪个公司好
  • 做营养的网站sem是什么意思呢
  • 网站建设及外包营销策划推广公司
  • 资源库网站建设的总结怎么在百度上发布自己的信息
  • 系统之家网站怎么做seo智能优化软件
  • 改革开放40周年网站发展建设厦门网站seo哪家好
  • 怎么建立本地网站安卓优化大师官网下载
  • 南山区做网站新媒体口碑营销案例
  • 老版建设银行网站今日头条号官网
  • 无忧网站建设报价媒介平台
  • wordpress用户登录教程搜索关键词排名优化服务
  • 音乐网站开发参考文献淘宝seo搜索引擎优化
  • 国外做衣服网站石家庄百度seo排名
  • 网站备案归百度竞价托管代运营公司
  • 网站开发中怎么联系客服热门关键词
  • 廊坊模板网站建设ai智能搜索引擎