Linux C/C++ 学习日记(27):KCP协议(三):源码分析与使用示例
注:该文用于个人学习记录和知识交流,如有不足,欢迎指点。
源码见:Linux C/C++ 学习日记(26):KCP协议(二):kcp源码分享-CSDN博客
一、关键结构体
1.struct IKCPSEG
(KCP 数据分片,传输的基本单元)
关键参数(并非全部)
成员变量 | 含义说明 | 修改该成员的函数及场景 |
---|---|---|
conv | 会话编号(唯一标识一个 KCP 连接,类似 TCP 的会话标识) | - - |
cmd | 分片类型(IKCP_CMD_PUSH /ACK /WASK /WINS ) | - 发送数据时设为 - |
frg | 分片编号(消息分片时,从count-1 递减到 0,0 表示最后一个分片) | - ikcp_send :分片时根据总片数分配(消息模式下);流模式下固定为 0。 |
wnd | 发送方当前的接收窗口空闲大小(告知对方自己的接收能力) | - ikcp_flush :发送分片时设为ikcp_wnd_unused(kcp) 的返回值(rcv_wnd - nrcv_que )。 |
ts | 分片发送时的时间戳(用于对方计算 RTT) | - ikcp_flush :首次发送或重传时,设为当前时间戳kcp->current 。 |
sn | 分片序号(全局递增,唯一标识一个分片) | - ikcp_flush :将分片从snd_queue 移到snd_buf 时,分配为kcp->snd_nxt 并递增snd_nxt 。 |
una | 发送方已确认的最大序号(告知对方 “我已收到una 之前的所有分片”) | - ikcp_flush :发送分片时设为当前kcp->rcv_nxt (接收方下一个期望的序号)。 |
len | 分片的数据部分长度 | - ikcp_send :分片时根据 MSS 设置(不超过kcp->mss );- ikcp_parse_data :解析接收的分片时从数据包中提取。 |
resendts | 下次重传的时间戳(超时重传的触发时间) | - ikcp_flush :首次发送时设为current + rto + rtomin ;重传时更新为current + rto 。 |
rto | 该分片的重传超时时间(动态调整) | - ikcp_flush :首次发送时设为kcp->rx_rto ;重传时根据nodelay 模式调整(正常模式累加 RTO,无延迟模式累加 RTO/2)。 |
fastack | 被其他 ACK 跳过的次数(用于触发快速重传) | - ikcp_parse_fastack :当收到序号更大的 ACK 时,对中间未确认的分片递增此值。 |
xmit | 该分片的发送次数(包括首次和重传) | - ikcp_flush :首次发送或重传时递增(每次发送 + 1)。 |
2.struct IKCPCB
(KCP 控制块,管理会话状态)
关键参数(并非全部)
成员变量 | 含义说明 | 修改该成员的函数及场景 |
---|---|---|
conv | 会话编号(唯一标识当前 KCP 连接) | - ikcp_create :初始化时赋值,后续不变。 |
mtu | 最大传输单元(单个 UDP 包的最大大小,默认 1400 字节) | - - |
mss | 最大分片大小(mtu - IKCP_OVERHEAD ,默认 1376 字节) | - - |
state | 连接状态(0 = 正常,-1 = 链路断开) | - ikcp_flush :当分片重传次数xmit ≥ dead_link 时,设为-1 。 |
snd_una | 发送方第一个未确认的序号(发送窗口左边界) | - ikcp_shrink_buf :发送缓存非空时设为队头分片的sn ,否则设为snd_nxt ;该函数在ikcp_parse_ack /ikcp_parse_una 后调用。 |
snd_nxt | 发送方下一个待分配的序号(发送窗口右边界) | - ikcp_flush :将分片从snd_queue 移到snd_buf 时递增(分配sn )。 |
rcv_nxt | 接收方下一个期望的序号(接收窗口左边界) | - - |
rx_srtt | 平滑 RTT(往返时间) | - ikcp_update_ack :根据新测量的 RTT 更新(首次设为 RTT,后续加权平滑)。 |
rx_rttval | RTT 偏差(反映网络抖动) | - ikcp_update_ack :根据 RTT 与平滑 RTT 的差值更新(加权平滑)。 |
rx_rto | 重传超时时间(基于rx_srtt 和rx_rttval 计算) | - ikcp_update_ack :计算为rx_srtt + 4*rx_rttval ,并限制在[rx_minrto, IKCP_RTO_MAX] 。 |
rx_minrto | 最小 RTO(避免过小的 RTO 导致冗余重传) | - - |
snd_wnd | 发送窗口大小(本地允许的未确认分片最大数量) | - - |
rcv_wnd | 接收窗口大小(本地可缓存的乱序分片最大数量) | - - |
rmt_wnd | 远端告知的接收窗口大小(对方的接收能力) | - ikcp_input :解析任何分片时,从wnd 字段提取并更新。 |
cwnd | 拥塞窗口大小(动态调整,限制实际发送速率) | - - |
probe | 窗口探测标记(IKCP_ASK_SEND /IKCP_ASK_TELL ) | - - - |
interval | 状态刷新间隔(ikcp_update 的调用周期,默认 100ms) | - - |
ts_flush | 下一次调用ikcp_flush 的时间戳 | - ikcp_update :首次调用时初始化,之后按interval 递增(超时则校正)。 |
nrcv_buf | 接收缓存(rcv_buf )中的分片数量 | - ikcp_parse_data :添加新分片时递增,移除有序分片时递减。 |
nsnd_buf | 发送缓存(snd_buf )中的分片数量 | - - |
nrcv_que | 接收队列(rcv_queue )中的分片数量(可交付给用户) | - - |
nsnd_que | 待发送队列(snd_queue )中的分片数量(未进入发送窗口) | - - |
nodelay | 无延迟模式开关(0 = 关闭,1 = 开启) | - - |
ackcount | ACK 列表中待发送的 ACK 数量 | - - |
fastresend | 快速重传触发阈值(被跳过的 ACK 数量) | - - |
nocwnd | 拥塞控制开关(0 = 启用,1 = 禁用) | - - |
3.struct IQUEUEHEAD
成员变量 | 含义说明 | 修改该成员的函数及场景 |
---|---|---|
next | 指向链表中的下一个节点(双向链表的后向指针) | - - - |
prev | 指向链表中的上一个节点(双向链表的前向指针) | - - - |
二、函数
1. 基础工具函数(跨平台与数值运算)
函数名 | 功能描述 | 操作细节 | 核心作用 |
---|---|---|---|
ikcp_encode_seg | 编码 KCP 分段为字节流(用于组装发送数据包) | 按顺序调用
| 把 KCP 分段(IKCPSEG )的控制信息与数据转化为符合协议格式的字节流,为底层传输(如 UDP 发送)提供可直接发送的数据,是 “KCP 逻辑→网络数据包” 的编码核心步骤 |
ikcp_encode8u | 编码 8 位无符号整数 | 直接将 8 位数据写入指针,指针后移 1 字节 | 确保 8 位数据跨平台传输一致性,用于协议头基础字段(如cmd /frg )读写 |
ikcp_decode8u | 解码 8 位无符号整数 | 直接从指针读取 8 位数据,指针后移 1 字节 | 同上,解析 8 位协议头字段(如cmd /frg ) |
ikcp_encode16u | 编码 16 位无符号整数(小端序) | 大端 / 需对齐系统手动拆分低 8 位→高 8 位;小端系统直接内存拷贝,指针后移 2 字节 | 解决 16 位数据(如wnd )大小端差异,保证协议头(如接收窗口大小)解析一致 |
ikcp_decode16u | 解码 16 位无符号整数(小端序) | 大端 / 需对齐系统手动拼接低 8 位→高 8 位;小端系统直接内存拷贝,指针后移 2 字节 | 同上,解析 16 位协议头字段(如接收窗口大小) |
ikcp_encode32u | 编码 32 位无符号整数(小端序) | 大端 / 需对齐系统手动拆分 4 字节(低→高);小端系统直接内存拷贝,指针后移 4 字节 | 解决 32 位核心数据(如conv /sn /ts )大小端差异,为协议关键字段(会话 ID、序号、时间戳)读写提供基础 |
ikcp_decode32u | 解码 32 位无符号整数(小端序) | 大端 / 需对齐系统手动拼接 4 字节(低→高);小端系统直接内存拷贝,指针后移 4 字节 | 同上,解析 32 位核心协议字段(如会话 ID、序号、时间戳) |
_imin_ | 取两个无符号整数的最小值 | 直接比较返回较小值 | 窗口控制、拥塞控制中取边界值(如发送窗口与远端窗口的最小值) |
_imax_ | 取两个无符号整数的最大值 | 直接比较返回较大值 | 边界判断(如 RTT 偏差与刷新间隔的最大值) |
_ibound_ | 将数值限制在 [lower, upper] 范围内 | 先取与lower 的最大值,再取与upper 的最小值 | 确保参数合法性(如 RTO 限制在rx_minrto 与IKCP_RTO_MAX 之间) |
_itimediff | 计算两个时间戳的差值(支持 32 位无符号回绕) | 强制转换为有符号整数计算later - earlier | 超时判断、RTT 计算(如判断重传时间是否到达、计算往返时间) |
2. 内存与分段管理函数(资源分配释放)
函数名 | 功能描述 | 操作细节 | 核心作用 |
---|---|---|---|
ikcp_malloc | 内部内存分配函数 | 优先调用用户自定义分配器(ikcp_malloc_hook ),无则用系统malloc | 为 KCP 所有资源(分段、缓冲区、ACK 列表)分配内存,支持自定义内存策略 |
ikcp_free | 内部内存释放函数 | 优先调用用户自定义释放器(ikcp_free_hook ),无则用系统free | 释放ikcp_malloc 分配的内存,避免内存泄漏 |
ikcp_allocator | 设置自定义内存分配器,替换默认malloc /free | 将用户提供的分配 / 释放函数指针赋值给ikcp_malloc_hook /ikcp_free_hook | 支持用户接入内存池,优化高频内存操作性能(如游戏、高并发场景) |
ikcp_segment_new | 创建 KCP 数据分段(IKCPSEG ,含协议头 + 数据缓冲区) | 分配sizeof(IKCPSEG) + size 内存(size 为数据长度),返回分段指针 | 生成数据传输的基础单元,承载协议头与用户数据 |
ikcp_segment_delete | 释放 KCP 数据分段 | 调用ikcp_free 回收分段内存 | 销毁无用分段(如已确认的发送分段、重复的接收分段),释放内存资源 |
3. KCP 控制块管理函数(核心对象生命周期)
ikcp_create、ikcp_release、ikcp_setoutput
函数名 | 功能描述 | 操作细节 | 核心作用 |
---|---|---|---|
ikcp_create | 创建 KCP 控制块(ikcpcb ,管理单个连接的所有状态) | 1. 核心参数初始化:设置会话 ID( 2. 队列与缓存初始化:创建 4 个核心双向队列( 4. 返回结果:成功时返回 | 初始化 KCP 连接的核心对象,为协议运行提供基础数据结构(队列、缓冲区)与参数(窗口、RTT 等),是 KCP 会话的 “启动入口”。 |
ikcp_release | 释放 KCP 控制块及所有关联资源 | 1. 队列资源清理:遍历并清空 4 个队列( 2. 缓冲区与 ACK 列表释放:释放发送缓冲区( 3. 控制块释放:调用 | 销毁 KCP 连接,回收所有动态分配的内存资源(分段、缓冲区、控制块等),避免内存泄漏,是 KCP 会话的 “终止出口”。 |
ikcp_setoutput | 设置数据输出回调(KCP 发送数据时调用,通常绑定 UDP 发送) | 将用户提供的输出函数指针(原型为int (*output)(const char *buf, int len, struct IKCPCB *kcp, void *user) )赋值给ikcpcb 的output 成员。 | 打通 KCP 与底层传输层(如 UDP)的接口,让 KCP 协议栈能通过该回调函数将封装好的数据包发送到网络,是 “协议逻辑→网络传输” 的关键桥梁。 |
4. 数据发送与接收函数(用户层接口,与队列的交互)
ikcp_send、ikcp_recv、ikcp_peeksize
函数名 | 功能描述 | 操作细节 | 核心作用 |
---|---|---|---|
ikcp_send | 上层发送接口:将用户数据分片并加入待发送队列 | 1. 流模式优化:若启用流模式( 3. 分片有效性校验:分片数量不得超过接收窗口大小( 4. 分片创建与入队:为每个分片分配内存,设置 | 处理用户发送的数据,将其拆分为符合 KCP 协议的分片并缓存,是用户数据进入 KCP 协议栈的入口,为后续ikcp_flush 的发送流程做准备。 |
ikcp_recv | 上层接收接口:从接收队列读取已重组的完整用户数据 | 1. 接收队列空检查:检查接收队列( 2. 数据长度预览与校验:通过 3. 数据重组与拷贝:按 4. 有序分片转移:将接收缓存( 5. 窗口通知标记:若接收窗口从 “满状态” 变为 “有空闲”,标记需要告知对方(`probe= IKCP_ASK_TELL`),让远端继续发送数据。 | 向用户提供完整的接收数据,管理接收缓存与队列的状态流转,是用户层获取 KCP 可靠传输后数据的出口。 |
ikcp_peeksize | 预览接收队列中下一条完整消息的总长度 | 遍历接收队列( - 若为单分片消息( - 若为多分片消息,校验队列中分片数是否≥ | 辅助ikcp_recv 判断用户提供的缓冲区是否足够容纳下一条完整消息,避免数据截断,是接收流程的 “预检查” 环节。 |
5. 输入数据处理函数(UDP 数据包解析,用户层接口)
ikcp_input
函数名 | 功能描述 | 操作细节 | 核心作用 |
---|---|---|---|
ikcp_input | 解析接收到的 UDP 数据包,处理 KCP 协议逻辑(ACK 确认、数据分片、窗口探测 / 通知等),维护协议状态与拥塞控制 | 1. 数据校验:检查数据长度(至少包含 24 字节 KCP 协议头)和会话编号( 2. 循环解析分片:逐一分片解析,直到剩余数据不足协议头长度: - ACK 分片( - 数据分片( - 窗口探测分片(
| 是 KCP 接收逻辑的核心入口,处理底层 UDP 数据,解析各类 KCP 分片,维护发送 / 接收状态(窗口、RTT、ACK 记录等),并驱动拥塞控制,保障数据可靠传输与流量适配。 |
ikcp_parse_data
函数名 | 功能描述 | 操作细节 | 核心作用 |
---|---|---|---|
ikcp_parse_data | 处理接收的新分片:去重、排序,将有序分片转移至接收队列,为上层读取完整数据做准备 | 1. 窗口范围校验:检查分片序号( 2. 去重处理:遍历接收缓存( 3. 有序分片转移:循环检查 | 维护接收数据的有序性与完整性,将乱序接收的分片暂存于rcv_buf 并逐步排序,最终把连续有序的分片转移到rcv_queue ,确保上层ikcp_recv 能读取到完整、连续的用户数据。 |
6. 发送驱动与调度函数(数据发送核心)
函数名 | 功能描述 | 操作细节 | 核心作用 |
---|---|---|---|
ikcp_flush | 实际发送数据(ACK 包、探测包、数据分片),处理重传和拥塞控制 | 1. 发送 ACK 包:将 2. 窗口探测: 若远端窗口( 若需告知本地窗口状态,发送窗口通知包( 3. 数据分片发送: - 将发送队列( - 处理重传:首次发送的分片直接发送;超时未确认的分片重传(更新 RTO);被跳过的 ACK 次数达阈值时触发快速重传; 4. 拥塞控制:根据重传情况(快速重传 / 超时重传)调整拥塞窗口( | 驱动实际数据发送,处理重传逻辑(超时重传、快速重传),并通过拥塞控制适应网络状况,是 KCP 实现可靠传输的核心发送逻辑。 |
ikcp_update | 定时驱动 KCP 状态更新(核心调度函数) | 1. 记录当前时间戳( 2. 若当前时间达到预设的刷新时间( 3. 调整下一次刷新时间( | 按固定间隔触发 KCP 状态更新和数据发送,是驱动整个 KCP 协议逻辑运行的 “定时器”,保障重传检测、窗口探测等逻辑的周期性执行。 |
ikcp_check | 计算下一次需要调用ikcp_update 的时间,优化调度频率 | 1. 综合下一次刷新时间( 2. 取上述时间中的最小值作为 “最早需要调度的时间差”,且该时间差不超过配置的刷新间隔( 3. 返回 “当前时间 + 最早调度时间差” 的时间戳。 | 减少不必要的ikcp_update 调用,仅在真正需要处理协议逻辑(如到达刷新时间、分片需重传)时触发,优化高并发场景下的性能(避免 CPU 空转)。 |
7. ACK 与 RTT 相关函数(可靠性基础)
函数名 | 功能描述 | 操作细节 | 核心作用 |
---|---|---|---|
ikcp_update_ack | 根据 RTT 更新平滑 RTT、RTT 偏差和 RTO | 1. 首次测量: 2. 后续: - - `rx_rttval=(3*rx_rttval+RTT-rx_srtt)/4`; 3.RTO=rx_srtt+4*rx_rttval,限制范围 | 动态调整 RTO,让重传既及时又避免冗余,是可靠性的核心参数计算 |
ikcp_shrink_buf | 更新未确认序号(snd_una ),指向发送缓存最小未确认序号 | 1. 发送缓存非空:取队头分片 2. 缓存为空:设为 | 标记发送窗口左边界,明确未确认数据范围 |
ikcp_parse_ack | 处理单个 ACK:删除发送缓存中对应序号的分片 | 1. 检查 2. 遍历 | 释放已确认的发送资源,更新发送窗口 |
ikcp_parse_una | 处理批量 ACK(una ):删除发送缓存中所有序号小于una 的分片 | 遍历snd_buf ,删除sn < una 的分片,直到sn ≥ una | 批量释放已确认资源,提升效率 |
ikcp_parse_fastack | 统计被跳过的分片,用于触发快速重传 | 1. 检查 2. 遍历 | 为快速重传提供判断依据,减少重传延迟 |
ikcp_ack_push | 将需要确认的sn 和ts 加入 ACK 列表,用于批量发送 | 1. 容量不足时按 2 的幂扩容; 2. 加入 | 批量发送 ACK,减少小数据包数量,提升网络效率 |
ikcp_ack_get | 从 ACK 列表获取指定位置的sn 和ts | 通过索引p 读取acklist[p*2] (sn)和acklist[p*2+1] (ts) | 辅助ikcp_flush 批量编码 ACK 分片 |
8. 配置与控制函数(协议参数调整)
函数名 | 功能描述 | 操作细节 | 核心作用 |
---|---|---|---|
ikcp_setmtu | 设置 MTU(最大传输单元),计算 MSS(MSS=MTU-IKCP_OVERHEAD ) | 1. 校验 MTU≥50 且≥头部开销; 2. 分配新缓冲区; 3. 更新 | 适配不同网络的 MTU,避免 IP 分片,提升传输效率 |
ikcp_interval | 设置 KCP 状态刷新间隔(interval ) | 限制间隔在 10~5000 毫秒,更新kcp->interval | 调整协议处理频率,平衡延迟与 CPU 占用 |
ikcp_nodelay | 配置无延迟模式、刷新间隔、快速重传阈值、是否禁用拥塞控制 | 1. 2. 调整 | 优化实时场景(如游戏)的延迟,按需关闭拥塞控制 |
ikcp_wndsize | 设置发送窗口(snd_wnd )和接收窗口(rcv_wnd ) | 1. 发送窗口直接更新; 2. 接收窗口不小于 | 调整流量控制力度,平衡吞吐量与缓存占用 |
ikcp_waitsnd | 获取等待发送的数据包总数(发送缓存 + 待发送队列) | 返回kcp->nsnd_buf + kcp->nsnd_que | 告知用户当前发送队列积压情况,辅助流量控制 |
ikcp_getconv | 从 KCP 数据包中解析会话 ID(conv ) | 解码数据包头部第一个 32 位整数(conv ) | 用于多连接场景,区分不同 KCP 会话的数据包 |
9. 日志与输出相关函数(调试与监控)
函数名 | 功能描述 | 操作细节 | 核心作用 |
---|---|---|---|
ikcp_log | 通过用户回调输出日志,支持掩码过滤 | 1. 检查日志掩码匹配且回调存在; 2. 格式化日志后调用 | 提供协议运行日志,辅助调试(如数据发送 / 接收、重传、RTT 变化) |
ikcp_canlog | 检查是否允许记录指定类型的日志 | 判断(mask & kcp->logmask) != 0 且kcp->writelog != NULL | 避免无效日志格式化操作,提升性能 |
ikcp_output | 调用用户输出回调发送数据(底层传输接口) | 1. 检查回调存在; 2. 记录输出日志(若允许); 3. 调用 | 封装底层传输调用,让 KCP 与具体传输层(如 UDP)解耦 |
ikcp_qprint | 调试用:打印队列中分片的序号和时间戳(默认关闭) | 遍历队列,格式化输出每个分片的sn 和ts%10000 (仅#if 1 时生效) | 调试队列状态(如分片排序、积压情况),仅用于开发阶段 |
10. 数据解析与转移函数(分段处理)
函数名 | 功能描述 | 操作细节 | 核心作用 |
---|---|---|---|
ikcp_encode_seg | 将 KCP 分段编码为字节流(用于发送) | 按顺序编码conv 、cmd 、frg 、wnd 、ts 、sn 、una 、len 字段 | 生成符合 KCP 协议格式的字节流,为底层发送做准备 |
ikcp_wnd_unused | 计算接收窗口的空闲大小(可接收的新分片数量) | 返回kcp->rcv_wnd - kcp->nrcv_que (窗口满则返回 0) | 告知远端当前接收能力,用于流量控制(如 ACK 中携带窗口大小) |
三、运作逻辑
1. ack是如何发送的
recvfrom (用户调用)
-> ikcp_input (用户调用)
-> cmd == IKCP_CMD_PUSH
-> if (_itimediff(sn, kcp->rcv_nxt + kcp->rcv_wnd) < 0)
-> ikcp_ack_push(kcp, sn, ts); (推到acklists)
同时if (_itimediff(sn, kcp->rcv_nxt + kcp- >rcv_wnd) <0),ikcp_parse_data(kcp, seg);
-> ikcp_update(用户调用)
-> ikcp_flush
-> for (i = 0; i < kcp->ackcount; i++) ikcp_output(kcp, buffer, size);
2. 收到ack、una、是如何删除发送缓存区的
recvfrom (用户调用)
-> ikcp_input (用户调用)
-> 解析kcp头
-> 得到una
-> ikcp_parse_una、ikcp_shrink_buf
-> cmd == IKCP_CMD_ACK
-> ikcp_parse_ack、ikcp_shrink_buf
3. RTT、RTO是如何更新的
kcp系统级别(第一次重传时间):
recvfrom (用户调用)
-> ikcp_input (用户调用)
-> cmd == IKCP_CMD_ACK
-> if (_itimediff(kcp->current, ts) >= 0)
-> ikcp_update_ack (更新RTT、RTO)
单个包多次超时重传自己本身的RTO更新
-> ikcp_update(用户调用)
-> ikcp_flush
-> for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = p->next)
-> else if (_itimediff(current, segment->resendts) >= 0)
-> segment->xmit++; 发送次数+1
-> // 正常模式:RTO += max(当前RTO, 最新RTO)(快速增长,避免频繁重传)
if (kcp->nodelay == 0) segment->rto += _imax_(segment->rto, (IUINT32)kcp->rx_rto);
// 无延迟模式:RTO += RTO/2( slower增长,更快重传)
else IINT32 step = (kcp->nodelay < 2) ? ((IINT32)(segment->rto)) : kcp->rx_rto;
segment->rto += step / 2;
-> segment->resendts = current + segment->rto; // 更新重传时间
4. 窗口大小的更新
接收窗口: kcp->rcv_wnd (为常量)
空闲窗口:wnd:kcp->rcv_wnd - kcp->nrcv_que (为变量)
接收窗口: kcp->snd_wnd (为常量)
拥塞窗口:kcp->cwnd (为变量)
远程窗口(对方):kcp->rmt_wnd(为变量)
实际发送窗口cwnd:取接收窗口、远程窗口、拥塞窗口kcp->rmt_wnd(若开启)三者的最小值
发送缓存最大为实际发送窗口的大小
判断序号能否插入send_buf:_itimediff(kcp->snd_nxt, kcp->snd_una + cwnd) < 0
接收队列长度最大为接收窗口大小
判断序号能否插入recv_buf: _itimediff(sn, kcp->rcv_nxt + kcp->rcv_wnd) < 0
1. ikcp_input 中的更新:
recvfrom (用户调用)
-> ikcp_input (用户调用)
-> 解析kcp头
-> 得到远端窗口大小 rmt_wnd
-> kcp->rmt_wnd = rmt_wnd; 更新对方远端窗口大小
-> 处理完该UDP包中所有的KCP包之后
-> if (_itimediff(kcp->snd_una, prev_una) > 0)
-> 更新kcp->cwnd 更新拥塞窗口
2. ikcp_flush中的更新
-> ikcp_update(用户调用)
-> ikcp_flush
-> 初始化通用KCP头
-> // 更新传给对方的空闲窗口( kcp->rcv_wnd - kcp->nrcv_que)
seg.wnd = ikcp_wnd_unused(kcp);
-> 处理完ACK、ASKW的发送后
-> 更新实际发送窗口大小
cwnd = _imin_(kcp->snd_wnd, kcp->rmt_wnd);
// 如果启用拥塞控制,再受拥塞窗口限制
if (kcp->nocwnd == 0) cwnd = _imin_(kcp->cwnd, cwnd);
-> for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = p->next)
-> 发送完send_buf的数据后
-> if (change) 如果有快传
-> 更新kcp->cwnd 更新拥塞窗口
-> if (lost) 如果有重传
-> 更新kcp->cwnd 更新拥塞窗口
5. 快速重传是如何实现的
recvfrom (用户调用)
-> ikcp_input (用户调用)
-> 解析kcp头
-> while (1) 遍历一个UDP数据(内含多个KCP包)
-> cmd == IKCP_CMD_ACK
-> 记录max_ack
-> 遍历结束
-> if (flag != 0) 如果收到ACK
-> ikcp_parse_fastack 处理快速重传统计(统计被跳过的分段,seg->fastack++)
-> ikcp_update(用户调用)
-> ikcp_flush
-> ...
-> for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = p->next)
-> else if (segment->fastack >= resent)
-> ikcp_output
6. 怎么判断连接状态
当 segment->xmit > kcp->dead_link ,即该分片的重传次数(超时重传与快速重传)大于最大值(20次),则判定连接断开
ikcp_update(用户调用)
-> ikcp_flush
-> ...
-> 发送单个分片完毕后
-> if (segment->xmit >= kcp->dead_link)
-> kcp->state = (IUINT32)-1; // 标记链路断开 (0为正常连接)
注意需要用户自己手动获取state的值,然后选择是否关闭(dnag
7. 用户如何收发数据
kcp->snd_nxt: snd_que的第一个数据
将数据插入snd_que的条件:无
将snd_que的数据插入snd_buf的条件:_itimediff(kcp->snd_nxt, kcp->snd_una + cwnd) < 0
kcp ->recv_nxt: recv_que期望收到的数据
将数据插入recv_buf的条件:_itimediff(sn, kcp->rcv_nxt + kcp->rcv_wnd) < 0
将数据从recv_buf插入recv_que的条件:满足sn == kcp ->recv_nxt 且 当前接收队列小于接收窗口
发送数据
ikcp_send (用户调用) : 将数据存到snd_que
-> ikcp_update(用户调用)
-> ikcp_fllush : 将数据有选择的从snd_que存到snd_buf
-> output : 按需发送snd_buf中的数据
接收数据
ikcp_input(用户调用):接续KCP包
-> cmd == IKCP_CMD_PUSH
-> ikcp_parse_data : 按需插入recv_buf、然后按需将recv_buf的数据插入recv_que
-> ikcp_recv (用户调用): 按需取recv_que的数据、然后按需将ecv_buf的数据插入recv_que
8. 字节流与报文流的差异
差异点 | 报文流模式(stream=0 ,默认) | 字节流模式(stream=1 ) |
---|---|---|
消息边界 | 保留消息边界:每个ikcp_send 调用对应一个完整消息,接收方按消息粒度交付。 | 无消息边界:多次ikcp_send 的数据会被合并为连续字节流,类似 TCP。 |
分片标识(frg ) | frg 从n-1 递减到0 (n 为分片数),0 表示消息最后一个分片,用于标识消息内部分片顺序。 | frg 固定为0 ,不区分分片顺序(仅用于填充数据,无消息内部分片逻辑)。 |
分片策略 | 严格按消息拆分:每个消息独立分片,分片仅属于当前消息,不与其他消息的分片合并。 | 尽量填充分片:发送时会优先填充上一个未填满的分片(利用剩余空间),减少小分片数量。 |
接收行为 | ikcp_recv 一次返回一个完整消息,必须等待该消息的所有分片到达(否则返回 - 2)。 | ikcp_recv 返回尽可能多的连续字节(无需等待完整消息),按用户缓冲区大小返回数据。 |
数据合并 | 不同消息的分片不会合并,接收方严格区分不同ikcp_send 的数据。 | 多个ikcp_send 的数据会被合并为字节流,接收方无法区分原始发送次数。 |
适用场景 | 适合小消息、需要明确消息边界的场景(如游戏指令、协议交互、短消息)。 | 适合大文件传输、字节流交互场景(如文件下载、长连接数据流式传输)。 |
实现细节 | ikcp_send 会为每个消息独立分配分片,frg 按消息内部分片顺序编号。 | ikcp_send 优先复用未填满的分片(减少分片数量),frg 固定为 0,不维护消息内部分片关系。 |
四、值得借鉴的设计结构
1. 柔性数组
struct IKCPSEG
{struct IQUEUEHEAD node; // 链表节点:用于将分片加入发送/接收队列....char data[1]; // 数据缓冲区:柔性数组,存储用户数据(实际长度由len指定)
};
项目 | 具体说明 |
---|---|
柔性数组定义 | C99 标准引入的特性,指结构体最后一个成员为未指定大小的数组(语法可写为data[] 或data[1] ,data[1] 为兼容早期编译器的写法)。 |
柔性数组核心特点 | 1. 不占用结构体固定内存: 2. 位置约束:必须作为结构体的最后一个成员; 3. 内存分配:需通过动态内存分配(如 |
KCP 中数据存储需求 | KCP 分片( - 短消息:1 个分片,数据长度可能仅几十字节; - 长消息:拆分为多个分片,每个分片数据长度接近 MSS(如 1376 字节)。 |
传统方案的缺陷 | 1. 固定大小数组(如 2. 指针(如 |
柔性数组data[1] 的优势 | 1. 动态适配长度:通过 2. 内存连续:结构体元数据( 3. 管理简单:一次 |
2. 通过结构体成员指针反推结构体本身指针
// 计算结构体成员的偏移量:用于从成员指针反向获取结构体指针
#define IOFFSETOF(TYPE, MEMBER) ((size_t) &((TYPE *)0)->MEMBER)// 从成员指针获取结构体指针(容器_of实现):KCP核心技巧
// 原理:通过成员偏移量计算结构体起始地址,适用于链表节点嵌入其他结构体的场景
#define ICONTAINEROF(ptr, type, member) ( \(type*)( ((char*)((type*)ptr)) - IOFFSETOF(type, member)) )// 从链表节点指针获取包含该节点的结构体指针
#define IQUEUE_ENTRY(ptr, type, member) ICONTAINEROF(ptr, type, member)
项目 | 说明 | 示例 / 原理(结合 KCP 的struct IKCPSEG ) |
---|---|---|
核心原理 | 通过结构体成员的指针,结合该成员在结构体中的偏移量,反向计算出结构体本身的指针。 | 已知成员member 的指针ptr ,结构体起始地址 = ptr - 成员在结构体中的偏移量 。 |
关键宏 1:计算偏移量 | IOFFSETOF(TYPE, MEMBER) :计算成员MEMBER 在结构体TYPE 中的偏移量(字节数)。 | 原理:假设结构体首地址为 0,成员MEMBER 的地址即为偏移量。示例:IOFFSETOF(struct IKCPSEG, node) 计算node 在IKCPSEG 中的偏移量(因node 是第一个成员,偏移量为 0)。 |
关键宏 2:反推结构体指针 | ICONTAINEROF(ptr, type, member) :由成员member 的指针ptr ,反推type 类型结构体的指针。 | 公式: 1. 将成员指针 2. 减去成员偏移量,得到结构体起始地址; 3. 转为 |
KCP 中的典型应用 | 通过链表节点IQUEUEHEAD* 反推所属的IKCPSEG* 分片指针。 |
已知 |
核心价值 | 1. 无需为成员额外存储 “所属结构体指针”,节省内存; 2. 简化链表等数据结构的操作,通过嵌入的成员即可关联到完整结构体; 3. 广泛用于内存敏感场景(如协议栈、内核)。 | KCP 通过该技巧,仅用node 成员即可将IKCPSEG 接入链表,遍历链表时能快速定位分片,避免冗余指针带来的内存开销和管理复杂度。 |
3. hook钩子
// 内存分配/释放钩子(允许用户自定义内存管理,如使用内存池)
static void *(*ikcp_malloc_hook)(size_t) = NULL;
static void (*ikcp_free_hook)(void *) = NULL;// 内部内存分配函数(优先使用用户自定义钩子,否则用系统malloc)
static void *ikcp_malloc(size_t size)
{if (ikcp_malloc_hook)return ikcp_malloc_hook(size);return malloc(size);
}// 内部内存释放函数(优先使用用户自定义钩子,否则用系统free)
static void ikcp_free(void *ptr)
{if (ikcp_free_hook){ikcp_free_hook(ptr);}else{free(ptr);}
}// 重定义内存分配器(供用户设置自定义malloc/free)
void ikcp_allocator(void *(*new_malloc)(size_t), void (*new_free)(void *))
{ikcp_malloc_hook = new_malloc;ikcp_free_hook = new_free;
}
3.1 钩子定义
在 KCP 的内存管理代码中,hook
(钩子)是一种允许用户介入或替换库默认逻辑的机制
3.2 为什么使用钩子?
KCP 作为通用协议库,默认使用系统malloc
/free
管理内存,但不同场景可能有特殊需求(如性能优化、调试跟踪等)。钩子机制允许用户用自定义逻辑替换默认内存操作,而无需修改 KCP 源码,举例:
-
内存池优化:高频内存分配(如 KCP 分片的创建 / 销毁)会导致系统
malloc
/free
性能损耗。用户可实现内存池(预先分配一批内存块,按需分配 / 回收),通过ikcp_allocator
挂钩后,KCP 会使用内存池管理内存,提升性能。 -
内存调试与监控:用户可在自定义
malloc
/free
中添加日志(记录分配大小、地址、调用栈),或检测内存泄漏(如统计分配 / 释放次数是否匹配),帮助调试 KCP 的内存使用问题。 -
跨平台适配:某些嵌入式系统或特殊环境可能不支持标准
malloc
/free
,用户可通过钩子接入平台专用的内存管理接口,确保 KCP 正常运行
3.3 钩子机制的核心优势
优势 | 说明 |
---|---|
无侵入性 | 无需修改 KCP 核心代码,通过外部接口(ikcp_allocator )即可替换逻辑,降低维护成本。 |
灵活性与可扩展性 | 不同用户 / 场景可根据需求定制(如内存池、调试、跨平台),库本身保持通用。 |
默认兼容性 | 未设置钩子时自动使用系统默认逻辑,保证基础功能可用,无需用户额外配置。 |
总结
KCP 中的ikcp_malloc_hook
和ikcp_free_hook
是函数钩子的典型应用:通过函数指针预留 “扩展点”,允许用户在不侵入库核心逻辑的前提下,替换内存管理等关键操作,从而适配不同场景的需求(性能、调试、跨平台等)。这种机制是库设计中 “开闭原则”(对扩展开放、对修改关闭)的经典实践。
4.snd_queue、rcv_queue、snd_buf、rcv_buf设计结构的优势
队列名称 | 所属侧 | 核心功能 | 数据流转(来源→去向) | 设计优势具体体现 |
---|---|---|---|---|
snd_queue | 发送侧 | 暂存用户发送的分片,这些分片未进入发送窗口,等待窗口空闲后被处理 | 用户数据(ikcp_send 分片)→ snd_queue → 符合窗口条件后移至snd_buf (ikcp_flush ) | 1. 解耦用户发送与窗口控制:用户调用 2. 隔离未发送数据:与已进入窗口的 |
snd_buf | 发送侧 | 存储已进入发送窗口的分片,负责重传(超时 / 快速重传)和等待 ACK 确认 | snd_queue → snd_buf → 网络(UDP 发送);确认后从snd_buf 删除(ikcp_parse_ack ) | 1. 集中管理重传逻辑:仅处理已发送未确认的分片,便于统一检测超时和快速重传; 2. 精细流量控制:受 |
rcv_buf | 接收侧 | 暂存从网络收到的分片(可能乱序),等待排序后转移至接收队列 | 网络数据(ikcp_input 解析)→ rcv_buf → 序号连续的分片移至rcv_queue | 1. 处理 UDP 乱序:缓存乱序到达的分片,通过序号排序确保数据连续性; 2. 隔离未就绪数据:与用户可读取的 |
rcv_queue | 接收侧 | 存储已排序且连续的分片,供用户通过ikcp_recv 读取 | rcv_buf (有序分片)→ rcv_queue → 用户(ikcp_recv 读取后删除) | 1. 保障用户数据有序性:仅向用户提供连续完整的数据,屏蔽 UDP 乱序特性; 2. 接收端流量控制:大小受 |
对比传统单队列:设计核心优势总结
传统单队列问题 | 四队列设计的改进 |
---|---|
用户发送需立即判断窗口,窗口满则阻塞 | snd_queue 暂存数据,用户发送无需等待窗口,降低延迟 |
已发送 / 未发送、已接收 / 未接收分片混杂 | 四队列分层管理,状态隔离,逻辑清晰 |
无法精细控制发送速率,易引发拥塞或崩溃 | snd_buf 和 rcv_buf 通过多层窗口限制,实现闭环流量控制 |
乱序与重传逻辑混杂,状态管理复杂 | rcv_buf 处理乱序、snd_buf 处理重传,职责单一 |
总的来说:分层队列设计使逻辑清晰,便于修改单一阶段(如调整拥塞控制策略仅需修改snd_buf
逻辑),且支持多线程优化(分队列加锁)。
五、用户能调用的API
接口函数 | 功能描述 |
---|---|
ikcp_create(IUINT32 conv, void *user) | 创建 KCP 控制块(会话),conv 为会话编号(双方需一致),user 为用户自定义数据指针;返回控制块指针,失败返回 NULL。 |
ikcp_release(ikcpcb *kcp) | 释放 KCP 控制块及所有资源(队列、缓存、ACK 列表等),调用后控制块指针失效。 |
ikcp_setoutput(ikcpcb *kcp, int (*output)(...)) | 设置发送回调函数,KCP 需发送数据时会调用该函数(由用户实现 UDP 底层发送逻辑)。 |
ikcp_recv(ikcpcb *kcp, char *buffer, int len) | 从接收队列(rcv_queue )读取已排序的完整用户数据;buffer 为接收缓冲区,len 为缓冲区最大长度;返回读取的数据长度,失败返回负数(如无数据)。 |
ikcp_send(ikcpcb *kcp, const char *buffer, int len) | 将用户数据加入发送队列(snd_queue )等待分片发送;buffer 为待发送数据,len 为数据长度;返回 0 表示成功,失败返回负数(如窗口满)。 |
ikcp_update(ikcpcb *kcp, IUINT32 current) | 定期更新 KCP 状态(必须调用),处理超时重传、ACK 批量发送、窗口探查等逻辑;current 为当前时间戳(毫秒级)。 |
ikcp_check(const ikcpcb *kcp, IUINT32 current) | 获取下次调用ikcp_update 的时间戳,用于优化调度(减少不必要的频繁调用);返回下次更新时间。 |
ikcp_input(ikcpcb *kcp, const char *data, long size) | 处理底层接收的 UDP 数据,解析为 KCP 分片并处理(数据分片、ACK、窗口探查等);data 为 UDP 数据包,size 为包长度;返回 0 表示成功,失败返回负数。 |
ikcp_flush(ikcpcb *kcp) | 刷新待发送数据:将send_queue 中的消息分片,加入send_buf 并调用发送回调发送;可主动调用或由ikcp_update 自动触发。 |
ikcp_peeksize(const ikcpcb *kcp) | 获取接收队列(rcv_queue )中下一条消息的长度,用于判断缓冲区是否足够;返回消息长度,无消息返回 0。 |
ikcp_setmtu(ikcpcb *kcp, int mtu) | 设置最大传输单元(MTU),自动计算最大分片大小(MSS = MTU - 协议头长度);mtu 最小为 50 字节;返回 0 表示成功。 |
ikcp_wndsize(ikcpcb *kcp, int sndwnd, int rcvwnd) | 设置发送窗口和接收窗口大小:sndwnd 为本地允许的未确认分片数,rcvwnd 为本地可缓存的未处理分片数;返回 0 表示成功。 |
ikcp_waitsnd(const ikcpcb *kcp) | 获取待发送(含已发送未确认)的分片总数,用于监控发送队列状态;返回分片数量。 |
ikcp_nodelay(ikcpcb *kcp, int nodelay, int interval, int resend, int nc) | 配置无延迟模式:nodelay 为无延迟开关(1 启用),interval 为刷新间隔(毫秒),resend 为快速重传阈值,nc 为拥塞控制开关(1 关闭);返回 0 表示成功。 |
ikcp_log(ikcpcb *kcp, int mask, const char *fmt, ...) | 输出 KCP 日志,通过writelog 回调函数实现;mask 为日志类型掩码(如IKCP_LOG_SEND ),fmt 为格式化字符串。 |
ikcp_allocator(void* (*new_malloc)(size_t), void (*new_free)(void*)) | 设置自定义内存分配器,替换默认的malloc 和free (如使用内存池优化性能)。 |
ikcp_getconv(const void *ptr) | 从 UDP 数据包中解析会话编号(conv ),用于匹配对应的 KCP 会话;返回解析出的conv 。 |
六、基于kcp协议实现的简单实例
示例1:客户端不发送心跳,服务器监测到客户端在某个时间内没发信息就断开连接
1. 服务端
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <time.h>
#include <sys/time.h>
#include <errno.h>
#include "ikcp.h"// 最大连接数
#define MAX_CONNECTIONS 1024
// 连接超时时间(30秒)
#define TIMEOUT_MS 30000
#define SHUTDOWN_MSG "KCP_SERVER_SHUTDOWN"// UDP套接字
int udp_socket;// KCP连接结构体(管理单个会话)
typedef struct {uint32_t conv; // 会话标识ikcpcb *kcp; // KCP实例struct sockaddr_in client_addr; // 客户端地址socklen_t client_addr_len; // 地址长度uint32_t last_active; // 最后活动时间(毫秒)
} KcpConnection;// 连接管理数组
KcpConnection connections[MAX_CONNECTIONS];
int conn_count = 0;// 互斥锁(单线程可省略,多线程需添加)
// pthread_mutex_t conn_mutex = PTHREAD_MUTEX_INITIALIZER;// KCP发送回调(使用当前连接的客户端地址)
int kcp_send_cb(const char *buf, int len, ikcpcb *kcp, void *user) {KcpConnection *conn = (KcpConnection *)user;if (sendto(udp_socket, buf, len, 0, (struct sockaddr*)&conn->client_addr, conn->client_addr_len) < 0) {perror("sendto failed");return -1;}return len;
}// 获取当前毫秒时间
uint32_t current_ms() {struct timeval tv;gettimeofday(&tv, NULL);return tv.tv_sec * 1000 + tv.tv_usec / 1000;
}// 查找或创建KCP连接(根据conv)
KcpConnection* find_or_create_connection(uint32_t conv, struct sockaddr_in *client_addr, socklen_t addr_len) {// 查找现有连接for (int i = 0; i < conn_count; i++) {if (connections[i].conv == conv) {// 更新最后活动时间connections[i].last_active = current_ms();return &connections[i];}}// 超过最大连接数,返回NULLif (conn_count >= MAX_CONNECTIONS) {fprintf(stderr, "Too many connections (max: %d)\n", MAX_CONNECTIONS);return NULL;}// 创建新连接KcpConnection *conn = &connections[conn_count++];conn->conv = conv;conn->kcp = ikcp_create(conv, conn); // user参数绑定当前连接if (!conn->kcp) {fprintf(stderr, "Failed to create KCP instance for conv: %u\n", conv);conn_count--;return NULL;}// 初始化KCP参数conn->kcp->output = kcp_send_cb;ikcp_nodelay(conn->kcp, 1, 10, 2, 1); // 快速模式ikcp_wndsize(conn->kcp, 128, 128); // 窗口大小// 保存客户端地址memcpy(&conn->client_addr, client_addr, addr_len);conn->client_addr_len = addr_len;conn->last_active = current_ms();printf("New connection created, conv: %u\n", conv);return conn;
}// 清理超时连接
void cleanup_timeout_connections() {uint32_t now = current_ms();for (int i = 0; i < conn_count; ) {if (now - connections[i].last_active > TIMEOUT_MS) {printf("Connection conv: %u timeout, closed\n", connections[i].conv);ikcp_send(connections[i].kcp, SHUTDOWN_MSG, strlen(SHUTDOWN_MSG)); // 可选:通知客户端ikcp_flush(connections[i].kcp); // 立即发送数据ikcp_release(connections[i].kcp); // 释放KCP实例// 用最后一个连接覆盖当前位置,减少数组移动if (i < conn_count - 1) {connections[i] = connections[conn_count - 1];}conn_count--;} else {i++;}}
}int main(int argc, char *argv[]) {if (argc != 2) {fprintf(stderr, "Usage: %s <port>\n", argv[0]);return 1;}int port = atoi(argv[1]);// 创建UDP套接字if ((udp_socket = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {perror("socket creation failed");return 1;}// 绑定地址和端口struct sockaddr_in server_addr;memset(&server_addr, 0, sizeof(server_addr));server_addr.sin_family = AF_INET;server_addr.sin_addr.s_addr = INADDR_ANY;server_addr.sin_port = htons(port);if (bind(udp_socket, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {perror("bind failed");close(udp_socket);return 1;}printf("KCP server started on port %d (dynamic conv)\n", port);char udp_buf[4096];uint32_t current_time, last_update = current_ms();while (1) {current_time = current_ms();// 每10ms更新所有KCP实例状态if (current_time - last_update >= 10) {for (int i = 0; i < conn_count; i++) {ikcp_update(connections[i].kcp, current_time);}last_update = current_time;// 定期清理超时连接cleanup_timeout_connections();}// 非阻塞接收UDP数据struct sockaddr_in client_addr;socklen_t client_addr_len = sizeof(client_addr);ssize_t n = recvfrom(udp_socket, udp_buf, sizeof(udp_buf), MSG_DONTWAIT, (struct sockaddr*)&client_addr, &client_addr_len);if (n > 0) {// KCP数据包至少包含4字节conv,否则忽略if (n < 4) {fprintf(stderr, "Invalid KCP packet (too short)\n");continue;}// 从KCP头部解析conv(前4字节,小端序)uint32_t conv = *(uint32_t*)udp_buf;// 查找或创建连接KcpConnection *conn = find_or_create_connection(conv, &client_addr, client_addr_len);if (!conn) continue;// 将UDP数据输入到KCPif (ikcp_input(conn->kcp, udp_buf, n) != 0) {fprintf(stderr, "ikcp_input failed for conv: %u\n", conv);continue;}// 从KCP中提取有效数据char kcp_recv_buf[4096];int recv_len;while ((recv_len = ikcp_recv(conn->kcp, kcp_recv_buf, sizeof(kcp_recv_buf))) > 0) {kcp_recv_buf[recv_len] = '\0';printf("Received from conv %u: %s\n", conv, kcp_recv_buf);// 回显数据给客户端ikcp_send(conn->kcp, kcp_recv_buf, recv_len);}}usleep(1000); // 降低CPU占用}// 清理资源(实际运行中不会执行到这里)for (int i = 0; i < conn_count; i++) {ikcp_release(connections[i].kcp);}close(udp_socket);return 0;
}
2. 客户端
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <time.h>
#include <sys/time.h>
#include <errno.h>
#include "ikcp.h"#define SHUTDOWN_MSG "KCP_SERVER_SHUTDOWN"// UDP套接字和服务端地址
int udp_socket;
struct sockaddr_in server_addr;
socklen_t server_addr_len = sizeof(server_addr);// KCP发送回调
int kcp_send_cb(const char *buf, int len, ikcpcb *kcp, void *user)
{if (sendto(udp_socket, buf, len, 0,(struct sockaddr *)&server_addr, server_addr_len) < 0){perror("sendto failed");return -1;}return len;
}// 获取当前毫秒时间
uint32_t current_ms()
{struct timeval tv;gettimeofday(&tv, NULL);return tv.tv_sec * 1000 + tv.tv_usec / 1000;
}int main(int argc, char *argv[])
{if (argc != 3){fprintf(stderr, "Usage: %s <server_ip> <port>\n", argv[0]);return 1;}// 随机生成conv(实际应用中可使用更复杂的生成逻辑)srand(time(NULL));uint32_t conv = rand();printf("Client conv: %u\n", conv);// 创建UDP套接字if ((udp_socket = socket(AF_INET, SOCK_DGRAM, 0)) < 0){perror("socket creation failed");return 1;}// 设置服务端地址memset(&server_addr, 0, sizeof(server_addr));server_addr.sin_family = AF_INET;server_addr.sin_port = htons(atoi(argv[2]));if (inet_pton(AF_INET, argv[1], &server_addr.sin_addr) <= 0){perror("invalid address");close(udp_socket);return 1;}// 初始化KCP(使用随机生成的conv)ikcpcb *kcp = ikcp_create(conv, NULL);if (!kcp){fprintf(stderr, "Failed to create KCP instance\n");close(udp_socket);return 1;}// 配置KCP参数kcp->output = kcp_send_cb;ikcp_nodelay(kcp, 1, 10, 2, 1); // 快速模式ikcp_wndsize(kcp, 128, 128); // 窗口大小printf("Connected to %s:%s, enter messages to send (Ctrl+C to exit)\n", argv[1], argv[2]);char input_buf[1024];char udp_buf[4096];char kcp_recv_buf[4096];uint32_t last_update = current_ms();// 用select处理输入和接收fd_set read_fds;struct timeval tv;int max_fd = udp_socket + 1;while (1){FD_ZERO(&read_fds);FD_SET(STDIN_FILENO, &read_fds);FD_SET(udp_socket, &read_fds);// 超时10ms,用于KCP定期更新tv.tv_sec = 0;tv.tv_usec = 10000;int activity = select(max_fd, &read_fds, NULL, NULL, &tv);if (activity < 0 && errno != EINTR){perror("select error");break;}// 更新KCP状态ikcp_update(kcp, current_ms());// 处理用户输入if (FD_ISSET(STDIN_FILENO, &read_fds)){if (fgets(input_buf, sizeof(input_buf), stdin) != NULL){size_t len = strlen(input_buf);if (len > 0 && input_buf[len - 1] == '\n'){input_buf[len - 1] = '\0'; // 移除换行符len--;}if (len > 0){ikcp_send(kcp, input_buf, len);printf("Sent: %s\n", input_buf);}}}// 处理服务端回复if (FD_ISSET(udp_socket, &read_fds)){ssize_t n = recvfrom(udp_socket, udp_buf, sizeof(udp_buf), 0, NULL, NULL);if (n > 0){ikcp_input(kcp, udp_buf, n); // 输入到KCP// 提取有效数据int recv_len;while ((recv_len = ikcp_recv(kcp, kcp_recv_buf, sizeof(kcp_recv_buf))) > 0){kcp_recv_buf[recv_len] = '\0';if (strcmp(kcp_recv_buf, SHUTDOWN_MSG) == 0){printf("Connection timed out by server.\n");ikcp_release(kcp);close(udp_socket);return 0;}printf("Received from server: %s\n", kcp_recv_buf);}}}}// 清理资源ikcp_release(kcp);close(udp_socket);return 0;
}
示例2:客户端定期发送心跳PING,服务端回应PONG,在一定时间内客户端或服务端没收到对方的心跳或连接,就断开连接。
1. 服务端
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/time.h>
#include <errno.h>
#include <signal.h>
#include "ikcp.h"#define MAX_CONNECTIONS 1024
#define TIMEOUT_MS 30000
#define SHUTDOWN_MSG "KCP_SERVER_SHUTDOWN" // 断开通知标识int udp_socket;
int server_running = 1; // 服务端运行状态typedef struct {uint32_t conv;ikcpcb *kcp;struct sockaddr_in client_addr;socklen_t client_addr_len;uint32_t last_active;
} KcpConnection;KcpConnection connections[MAX_CONNECTIONS];
int conn_count = 0;// 发送回调(同前)
int kcp_send_cb(const char *buf, int len, ikcpcb *kcp, void *user) {KcpConnection *conn = (KcpConnection *)user;if (sendto(udp_socket, buf, len, 0, (struct sockaddr*)&conn->client_addr, conn->client_addr_len) < 0) {return -1;}return len;
}// 获取当前毫秒时间(同前)
uint32_t current_ms() {struct timeval tv;gettimeofday(&tv, NULL);return tv.tv_sec * 1000 + tv.tv_usec / 1000;
}// 查找连接(同前)
KcpConnection* find_connection(uint32_t conv) {for (int i = 0; i < conn_count; i++) {if (connections[i].conv == conv) {return &connections[i];}}return NULL;
}// 创建连接(同前)
KcpConnection* create_connection(uint32_t conv, struct sockaddr_in *client_addr, socklen_t addr_len) {if (conn_count >= MAX_CONNECTIONS) return NULL;KcpConnection *conn = &connections[conn_count++];conn->conv = conv;conn->kcp = ikcp_create(conv, conn);if (!conn->kcp) { conn_count--; return NULL; }conn->kcp->output = kcp_send_cb;ikcp_nodelay(conn->kcp, 1, 10, 2, 1);ikcp_wndsize(conn->kcp, 128, 128);memcpy(&conn->client_addr, client_addr, addr_len);conn->client_addr_len = addr_len;conn->last_active = current_ms();printf("New connection, conv: %u\n", conv);return conn;
}// 清理超时连接(同前)
void cleanup_timeout_connections() {uint32_t now = current_ms();for (int i = 0; i < conn_count; ) {if (now - connections[i].last_active > TIMEOUT_MS) {printf("Connection conv: %u timeout\n", connections[i].conv);ikcp_release(connections[i].kcp);if (i < conn_count - 1) connections[i] = connections[conn_count - 1];conn_count--;} else {i++;}}
}// 服务器关闭处理函数:发送断开通知
void server_shutdown() {server_running = 0;printf("\nServer shutting down, notifying clients...\n");// 向所有连接发送断开通知for (int i = 0; i < conn_count; i++) {KcpConnection *conn = &connections[i];// 发送断开标识ikcp_send(conn->kcp, SHUTDOWN_MSG, strlen(SHUTDOWN_MSG));// 强制刷新KCP缓冲区,确保消息发送ikcp_flush(conn->kcp);printf("Notified client conv: %u\n", conn->conv);}// 短暂等待,确保消息发送完成(UDP可能丢包,多次尝试)usleep(100000); // 100msfor (int i = 0; i < conn_count; i++) {KcpConnection *conn = &connections[i];ikcp_send(conn->kcp, SHUTDOWN_MSG, strlen(SHUTDOWN_MSG));ikcp_flush(conn->kcp);}// 释放资源for (int i = 0; i < conn_count; i++) {ikcp_release(connections[i].kcp);}close(udp_socket);printf("Server closed\n");exit(0);
}// 捕获Ctrl+C信号,触发优雅关闭
void handle_signal(int sig) {if (sig == SIGINT) {server_shutdown();}
}int main(int argc, char *argv[]) {if (argc != 2) {fprintf(stderr, "Usage: %s <port>\n", argv[0]);return 1;}signal(SIGINT, handle_signal); // 注册信号处理int port = atoi(argv[1]);if ((udp_socket = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {perror("socket failed");return 1;}struct sockaddr_in server_addr = {0};server_addr.sin_family = AF_INET;server_addr.sin_addr.s_addr = INADDR_ANY;server_addr.sin_port = htons(port);if (bind(udp_socket, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {perror("bind failed");close(udp_socket);return 1;}printf("KCP server (port %d) running, press Ctrl+C to exit\n", port);char udp_buf[4096];uint32_t last_update = current_ms();while (server_running) {uint32_t now = current_ms();// 每10ms更新KCP状态if (now - last_update >= 10) {for (int i = 0; i < conn_count; i++) {ikcp_update(connections[i].kcp, now);}last_update = now;cleanup_timeout_connections();}// 接收UDP数据struct sockaddr_in client_addr;socklen_t addr_len = sizeof(client_addr);ssize_t n = recvfrom(udp_socket, udp_buf, sizeof(udp_buf), MSG_DONTWAIT, (struct sockaddr*)&client_addr, &addr_len);if (n > 0) {if (n < 4) continue; // 无效KCP包uint32_t conv = *(uint32_t*)udp_buf;KcpConnection *conn = find_connection(conv);if (!conn) {conn = create_connection(conv, &client_addr, addr_len);if (!conn) continue;}conn->last_active = now;// 处理KCP输入if (ikcp_input(conn->kcp, udp_buf, n) != 0) continue;// 接收客户端数据(包含心跳)char kcp_recv_buf[4096];int recv_len;while ((recv_len = ikcp_recv(conn->kcp, kcp_recv_buf, sizeof(kcp_recv_buf))) > 0) {kcp_recv_buf[recv_len] = '\0';printf("conv %u: %s\n", conv, kcp_recv_buf);// 若客户端发送心跳,回应心跳if (strcmp(kcp_recv_buf, "PING") == 0) {ikcp_send(conn->kcp, "PONG", 4);} else {// 普通消息回显ikcp_send(conn->kcp, kcp_recv_buf, recv_len);}}}usleep(1000);}server_shutdown();return 0;
}
2. 客户端
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <time.h>
#include <sys/time.h>
#include <errno.h>
#include "ikcp.h"#define SHUTDOWN_MSG "KCP_SERVER_SHUTDOWN" // 服务器断开标识
#define HEARTBEAT_INTERVAL 5000 // 心跳间隔(5秒)
#define TIMEOUT_MS 10000 // 超时时间(10秒,超过则判定断开)int udp_socket;
struct sockaddr_in server_addr;
socklen_t server_addr_len = sizeof(server_addr);
uint32_t last_recv_time; // 最后一次收到服务器数据的时间// 发送回调(同前)
int kcp_send_cb(const char *buf, int len, ikcpcb *kcp, void *user) {if (sendto(udp_socket, buf, len, 0, (struct sockaddr*)&server_addr, server_addr_len) < 0) {perror("sendto failed");return -1;}return len;
}// 获取当前毫秒时间(同前)
uint32_t current_ms() {struct timeval tv;gettimeofday(&tv, NULL);return tv.tv_sec * 1000 + tv.tv_usec / 1000;
}int main(int argc, char *argv[]) {if (argc != 3) {fprintf(stderr, "Usage: %s <server_ip> <port>\n", argv[0]);return 1;}// 随机生成convsrand(time(NULL));uint32_t conv = rand();printf("Client conv: %u\n", conv);// 创建UDP套接字if ((udp_socket = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {perror("socket failed");return 1;}// 初始化服务器地址memset(&server_addr, 0, sizeof(server_addr));server_addr.sin_family = AF_INET;server_addr.sin_port = htons(atoi(argv[2]));if (inet_pton(AF_INET, argv[1], &server_addr.sin_addr) <= 0) {perror("invalid address");close(udp_socket);return 1;}// 初始化KCPikcpcb *kcp = ikcp_create(conv, NULL);if (!kcp) {fprintf(stderr, "KCP create failed\n");close(udp_socket);return 1;}kcp->output = kcp_send_cb;ikcp_nodelay(kcp, 1, 10, 2, 1);ikcp_wndsize(kcp, 128, 128);printf("Connected to %s:%s, enter messages (Ctrl+C to exit)\n", argv[1], argv[2]);char input_buf[1024];char udp_buf[4096];char kcp_recv_buf[4096];uint32_t last_heartbeat = current_ms(); // 最后一次发送心跳的时间last_recv_time = current_ms(); // 初始化最后接收时间fd_set read_fds;struct timeval tv;int max_fd = udp_socket + 1;int running = 1;while (running) {FD_ZERO(&read_fds);FD_SET(STDIN_FILENO, &read_fds);FD_SET(udp_socket, &read_fds);// 超时10ms,用于定期任务tv.tv_sec = 0;tv.tv_usec = 10000;int activity = select(max_fd, &read_fds, NULL, NULL, &tv);if (activity < 0 && errno != EINTR) {perror("select error");break;}uint32_t now = current_ms();ikcp_update(kcp, now);// 定期发送心跳if (now - last_heartbeat >= HEARTBEAT_INTERVAL) {ikcp_send(kcp, "PING", 4);last_heartbeat = now;// printf("Sent heartbeat\n");}// 检测服务器超时(长时间未收到数据)if (now - last_recv_time > TIMEOUT_MS) {printf("\nServer disconnected (timeout)\n");running = 0;break;}// 处理用户输入if (FD_ISSET(STDIN_FILENO, &read_fds)) {if (fgets(input_buf, sizeof(input_buf), stdin) != NULL) {size_t len = strlen(input_buf);if (len > 0 && input_buf[len-1] == '\n') {input_buf[len-1] = '\0';len--;}if (len > 0) {ikcp_send(kcp, input_buf, len);printf("Sent: %s\n", input_buf);}}}// 处理服务器数据if (FD_ISSET(udp_socket, &read_fds)) {ssize_t n = recvfrom(udp_socket, udp_buf, sizeof(udp_buf), 0, NULL, NULL);if (n > 0) {last_recv_time = now; // 更新最后接收时间ikcp_input(kcp, udp_buf, n);int recv_len;while ((recv_len = ikcp_recv(kcp, kcp_recv_buf, sizeof(kcp_recv_buf))) > 0) {kcp_recv_buf[recv_len] = '\0';// 检测服务器主动断开通知if (strcmp(kcp_recv_buf, SHUTDOWN_MSG) == 0) {printf("\nServer notified shutdown\n");running = 0;break;}// 处理心跳回应if (strcmp(kcp_recv_buf, "PONG") == 0) {// printf("Received heartbeat ack\n");} else {printf("Received: %s\n", kcp_recv_buf);}}if (!running) break;}}}// 清理资源ikcp_release(kcp);close(udp_socket);printf("Client exited\n");return 0;
}