当前位置: 首页 > wzjs >正文

什么公司做网站好小程序

什么公司做网站好,小程序,企业建设企业网站的好处有哪些,国内做家具外贸的网站6.1 消息存储机制 日志分段(Log Segment) Kafka的消息日志以分段(Segment)形式存储,每个Segment包含一个数据文件(.log)和两个索引文件(.index和.timeindex)&#xff1…

6.1 消息存储机制

日志分段(Log Segment)

Kafka的消息日志以分段(Segment)形式存储,每个Segment包含一个数据文件(.log)和两个索引文件(.index和.timeindex):

  • 数据文件:按时间顺序存储消息的二进制内容。
  • 偏移量索引文件:记录消息的Offset到物理位置的映射。
  • 时间戳索引文件:记录时间戳到Offset的映射。

日志分段文件示例:

00000000000000000000.log
00000000000000000000.index
00000000000000000000.timeindex
00000000000000100000.log
00000000000000100000.index
00000000000000100000.timeindex

文件名前缀为该Segment的起始Offset。

磁盘顺序读写优化

Kafka利用操作系统的页缓存(Page Cache)和零拷贝(Zero Copy)技术提升性能:

  1. 页缓存:消息写入时先写入Page Cache,由操作系统异步刷盘,避免频繁IO。
  2. 零拷贝:Consumer消费消息时,数据直接从Page Cache传输到网络套接字,无需经过用户空间,减少数据拷贝次数。

数据删除策略

Kafka支持两种日志清理策略:

  • 基于时间:删除超过log.retention.hours的日志段。
  • 基于大小:当日志总大小超过log.retention.bytes时,删除最早的日志段。

清理流程由Log Cleaner线程后台执行,采用标记-清除算法:

// 伪代码:Log Cleaner工作流程
while (true) {// 选择需要清理的日志段LogSegment segment = selectSegmentToClean();// 创建清理后的临时日志段LogSegment cleanedSegment = new LogSegment();// 遍历原始日志段,保留最新版本的消息for (Message message : segment) {if (isLatestVersion(message)) {cleanedSegment.append(message);}}// 替换原始日志段replaceSegment(segment, cleanedSegment);
}

6.2 网络通信协议

Kafka自定义协议

Kafka客户端与Broker之间通过TCP协议通信,使用自定义二进制协议:

  • 请求格式:包含请求头(Request Header)和请求体(Request Body)。
    • 请求头:包含API Key(标识请求类型)、API Version、Correlation ID(用于匹配响应)等。
    • 请求体:具体请求参数,如ProduceRequest、FetchRequest等。
  • 响应格式:与请求类似,包含响应头和响应体。

TCP连接管理

  • Producer连接:Producer通过bootstrap.servers配置连接到任意Broker,获取集群元数据后,直接与目标Topic的Leader Partition所在Broker建立连接。
  • Consumer连接:Consumer同样先获取元数据,然后根据分区分配结果,与对应Broker建立连接。

心跳机制

Consumer Group通过心跳机制维持与Coordinator的连接:

  1. 心跳线程:Consumer内部有一个专门的心跳线程,定期向Coordinator发送心跳请求。
  2. Session超时:若Coordinator在session.timeout.ms(默认10秒)内未收到心跳,认为Consumer已下线,触发Rebalance。
  3. Poll间隔:Consumer必须在max.poll.interval.ms(默认300秒)内调用poll()方法,否则也会触发Rebalance。

心跳机制源码关键部分:

// KafkaConsumer中的心跳线程
private class HeartbeatThread extends Thread {public void run() {while (!closed) {try {// 发送心跳sendHeartbeat();// 休眠heartbeat.interval.msThread.sleep(heartbeatIntervalMs);} catch (InterruptedException e) {// 处理中断}}}
}

6.3 源码导读

核心模块概述

Kafka源码主要分为以下模块:

  • clients:客户端实现,包括Producer、Consumer、AdminClient等。
  • core:Broker核心实现,包括请求处理、日志管理、副本同步等。
  • streams:流处理框架实现。
  • connect:数据集成工具实现。
  • tools:命令行工具。

Producer启动流程

  1. 初始化阶段
    // KafkaProducer初始化流程
    public KafkaProducer(Properties properties) {// 配置解析config = new ProducerConfig(properties);// 元数据管理器metadata = new Metadata(config.metadataMaxAgeMs());// 网络客户端client = new NetworkClient(...);// 记录累加器(消息缓冲区)accumulator = new RecordAccumulator(...);// 发送线程sender = new Sender(client, metadata, accumulator, ...);ioThread = new Thread(sender, "kafka-producer-network-thread");ioThread.start();
    }
    
  2. 消息发送阶段
    // 消息发送流程
    public Future<RecordMetadata> send(ProducerRecord<K, V> record) {// 序列化键和值byte[] serializedKey = keySerializer.serialize(record.topic(), record.key());byte[] serializedValue = valueSerializer.serialize(record.topic(), record.value());// 确定分区int partition = partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);// 将消息添加到累加器RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback);// 如果批次已满,唤醒Sender线程发送if (result.batchIsFull || result.newBatchCreated) {this.sender.wakeup();}return result.future;
    }
    

Consumer元数据获取

Consumer启动时获取集群元数据的关键流程:

// 获取元数据的核心方法
private Cluster metadataFetch() {// 标记元数据需要更新metadata.requestUpdate();// 阻塞等待元数据更新完成long begin = time.milliseconds();long remainingWaitMs = metadataTimeout;do {// 发送元数据请求sendMetadataRequest();// 处理响应client.poll(remainingWaitMs, begin);// 检查元数据是否更新if (metadata.updateRequested()) {Cluster cluster = metadata.fetch();if (cluster != null)return cluster;}remainingWaitMs = metadataTimeout - (time.milliseconds() - begin);} while (remainingWaitMs > 0);throw new TimeoutException("Failed to update metadata after " + metadataTimeout + " ms.");
}

Broker请求处理

Broker处理客户端请求的核心类是KafkaApis,它通过多线程池实现请求的并发处理:

// KafkaApis处理请求的主循环
public void handle(RequestChannel.Request request) {try {switch (request.header.apiKey()) {case PRODUCE:handleProduceRequest(request);break;case FETCH:handleFetchRequest(request);break;case METADATA:handleMetadataRequest(request);break;// 其他请求类型处理default:request.responseChannel.sendResponse(new RequestChannel.Response(request, new ApiError(Errors.UNSUPPORTED_VERSION, "")));}} catch (Exception e) {// 异常处理}
}

http://www.dtcms.com/wzjs/119199.html

相关文章:

  • 苏州网站制作出名 乐云践新搜索引擎优化是指什么意思
  • 制作公众号的平台seo算法入门教程
  • 公司做影视网站侵权站长工具官网查询
  • 网站的原型图seo岗位有哪些
  • 最好的开发网站建设价格搜索引擎排名优化建议
  • seo sem 做网站大众点评seo关键词优化
  • 网站联系我们模板武汉百度推广电话
  • 建议自考还是成考哈尔滨推广优化公司
  • 简单建网站千锋培训机构官网
  • 网站建设竞价托管外包掌门一对一辅导官网
  • 哪个网站可以学做咸菜seo是什么学校
  • 佛山网站优化多少钱如何做平台推广赚钱
  • 网站建设方案书原件做营销型网站哪家好
  • 远丰做网站怎么样网络优化工程师是干什么的
  • 怎么联网访问自己做的网站手机地图app下载安装
  • 和外国人做ic生意的网站网站平台如何推广
  • 可靠的政府网站建设万能浏览器
  • 义乌外发加工网是正规的吗优化关键词排名工具
  • 做网站销售说辞注册google账号
  • 报纸网站建设上海seo培训中心
  • 德惠网站谷歌seo是做什么的
  • wordpress里面主题文档怎么编辑黄山搜索引擎优化
  • 二级域名网站可以做360推广专业网站建设公司
  • 做落地页的网站北京搜索优化排名公司
  • 信阳搜索引擎优化优化新十条
  • 出效果图西安优化排名推广
  • 苏州企业网站建站系统关键词
  • 教育培训类网站开发公司网络推广排名定制
  • 网站首页html代码重庆高端网站seo
  • 哪些网站可以在线做动图深圳网络络推广培训