传输协议和消息队列
一.消息队列
消息队列(Message Queue,简称 MQ)是一种进程间通信(IPC)机制,用于在分布式系统或多线程/多进程程序中传递消息。通过提供 消息传递 和 消息排队 模型,它可以在 分布式环境 下提供 应用解耦、弹性伸缩、冗余存储、流量削峰、异步通信、数据同步 等等功能。常见的消息队列有 Kafka、RabbitMQ、RocketMQ、ActiveMQ 等。
1. 解耦: 模块之间不直接通信,减少耦合度。
2. 异步处理: 可以先把请求丢进队列里,慢慢处理,提高系统响应速度。
3. 削峰填谷: 高并发时排队处理,避免系统被压垮。
4. 失败恢复: 消息未消费前可以保留,实现容错。
二.防止消息丢失
以kafka为例:
在 Kafka 中有一个重要的概念叫偏移量(Offset),它是消息在分区中的编号,用来唯一标识该分区里的一条消息的位置。 Kafka 中的消息是按“主题(Topic) → 分区(Partition)”来组织的,每条消息在分区中都有一个递增的数字编号,这就是偏移量。 消费者读取分区里的消息时,就是从某个 offset 开始往后读,然后手动或自动告诉 Kafka 读到哪里的数据了。
1.消费者丢失数据
在kafka中, offset 是默认提交的,但是如果消费者自动提交了offset,让 Kafka 以为已经消费好了这个消息,但其实才刚准备处理这个消息,客户端就挂了,此时这条消息就丢失了,我们只需要在消费者端改成在处理完业务后手动提交 offset 即可。
但此时我们又会产生重复消费数据的可能,下面说。
2.Kafka 丢失数据
首先,我们要知道,Kafka 为每个分区(Partition)支持多副本(replica)机制:每个分区有一个 Leader 和若干个 Follower。生产者写入数据时,只写入 Leader。Follower 会从 Leader 同步数据(副本)。
如果 Kafka 某个分区的leader宕机,然后重新选举 leader。如果此时其他的 follower 刚好还有些数据没有同步,然后选举某个 follower 成 leader 之后,就会丢失一些数据。
我们要可以设置以下参数:
1.replication.factor : 给 topic 设置该参数,这个值必须大于 1,要求每个 partition 必须有至少 2 个副本。
2.min.insync.replicas : 在 Kafka 服务端设置该参数,这个只必须大于1至少几个副本必须同步成功,才算“写入成功”。
3.acks=all : 在生产者设置改参数,生产者等待“所有同步副本”都写成功,才能认为是写成功了。
4.retries=3 : 在生产者设置改参数,如果一旦写入失败,则重试。
3.生产者丢失数据
开启生产确认机制(acks),设置 acks=all,要求所有副本写入成功才返回,并开启重试机制。但这也可能引写重复的问题,下面说。
三.消息的幂等性
幂等性:相同的操作执行一次或多次,效果都一样。
在整个数据流转过程中,可能会发生以下情况:
1. 生产者层面的幂等性(防止写重复)
Kafka 生产者幂等性问题的本质原因在于:消息可能被“重复发送”到 Kafka Broker,而 Kafka 默认是允许重复写入的,如果没做“去重”或“确认机制”,在重试的时候就可能写了两条一模一样的消息。
Kafka 从 0.11 开始支持幂等生产者机制。
enable.idempotence=true
acks=all
retries > 0
Kafka 会为每个 producer 分配一个 producerId 和 sequence number;若发送同一条消息多次,Kafka 自动识别并去重;同一条消息在 broker 上 只会被写入一次。
注意:
1. 幂等性不是“去重”,而是防止“写重复”;
2. 幂等性保证的是同一个 producer 的“同一条消息”不会被 Kafka 多次写入;
3. 多个 producer(或重启时 ID 变了)就无效了。
2. 消费者重复消费
上面我们说,在消费端我们手动提交 offset,这样的话,在处理完数据之后,提交数据之前突然程序崩溃,那么我们实际已经消费了这条消息,但是kafka没有收到offset,所以在重启消费者后,会重复发生该数据,就会导致重复消费了。
解决方案:
1. 使用唯一消息 ID + 幂等判断
每条消息携带唯一业务 ID(比如订单号、消息流水号),消费时,先查数据库是否已处理过该 ID, 没处理就执行业务 + 记录已处理状态,如果已处理,直接跳过。如支付、扣款、积分发放等业务。
2. 采用唯一约束
为数据库加上唯一约束字段。或者增加一张消费记录表,表字段加上唯一约束条件(UNIQUE),消费完之后就往表里插入一条数据,防止重复插入。
3.消费写入使用数据库的 UPSERT 或 ON DUPLICATE KEY
例如 mysql 中使用 ON DUPLICATE KEY UPDATE
INSERT INTO user_bonus (user_id, bonus) VALUES ('u123', 100) ON DUPLICATE KEY UPDATE bonus = bonus;
如果发现有唯一约束冲突,则进行更新操作。
4.使用 redis 做幂等锁
幂等锁是一种防止同一请求/消息被重复处理的机制,是构建高并发下可靠幂等机制的关键手段之一,尤其在分布式场景下。
4.1 SETNX 实现幂等锁
SETNX key value
如果 key 不存在,就设置它,返回 true;如果已存在,就啥也不做,返回 false。
4.2 使用 Lua 脚本
如果需要执行多条redis指令,则可能发生线程安全的问题,这时可以采用Lua脚本。
Redis 会把整个 Lua 脚本当作一条命令执行,执行过程不会被中断、不会被其他命令插入,具备原子性。
四.Kafka保证消息有序性
Kafka 在每个分区内部保证消息是按照写入顺序被追加到日志文件中的。消费者按 offset 顺序读取,自然就能获取有序的消息。但是跨分区的情况下是不能保证有序的。
1. 生产者端:每条消息会带一个 key,比如订单号、用户 ID、会话 ID。Kafka 根据 key 的哈希值分配到特定 partition。每条具有相同的 key 的数据,会一直被发到同一个 partition,因此保持顺序。但要注意以单线程的方式写入。
2. Broker 端:消息会被顺序地写入相应的 partition, 每条消息都有一个递增的 offset。
3. 消费者端:按 offset 顺序读取消费。每个 partition 通常由一个消费者来处理,以保证顺序性。
跨分区的情况下,只能在业务层自己做排序了,如利用时间戳放在有序队列中,或者使用外部排序软件,如 Flink、Spark 等。
五.消息传输协议和队列
WebSocket、MQTT、Kafka、RabbitMQ、RocketMQ、ActiveMQ 虽然它们都与消息传递、实时通信或异步处理有关,但实际上它们处在不同的技术层面和使用场景。
1.WebSocket:一种通信协议,用于客户端和服务端之间的实时通信(比如网页聊天室)。
2.MQTT(Message Queuing Telemetry Transport): 是一种轻量级、发布/订阅(Pub/Sub)模式的消息协议,专为低带宽、不稳定网络环境设计,广泛用于物联网(IoT)、移动设备、智能家居等场景。
3.Kafka/RabbitMQ/RocketMQ/ActiveMQ:是消息中间件系统,用于分布式系统之间异步、可靠、高吞吐量的消息传递。
特性/工具 | WebSocket | MQTT | Kafka | RabbitMQ | RocketMQ | ActiveMQ |
---|---|---|---|---|---|---|
本质 | 通信协议 | 协议(轻量消息传输) | 分布式消息队列系统(日志系统) | 消息队列系统(面向可靠性) | 分布式消息队列系统(阿里) | 传统消息队列系统(JMS) |
使用场景 | 实时双向通信,低延迟 | 单向推送为主(客户端订阅) | 日志采集、指标流、流处理、高吞吐 | 任务调度、微服务解耦 | 分布式事务、金融、电商下单场景 | 企业应用集成、JMS兼容 |
消息模型 | 非消息队列 | 发布-订阅 | 发布-订阅 | 生产者-消费者(队列+主题) | 类似 Kafka,增强事务支持 | 点对点、发布订阅 |
传输协议 | TCP | TCP | 自定义协议(基于TCP) | AMQP(支持多协议) | 自定义协议 | OpenWire(也支持MQTT/AMQP等) |
持久化支持 | 不持久化 | 可选,依赖 QoS 配置 | 高效持久化(磁盘+零拷贝) | 可配置持久化 | 高性能持久化 | 可配置 |
实时性 | 极佳(毫秒级) | 非常好 | 延迟可接受(批量处理优先) | 较好 | 良好 | 一般 |
吞吐量 | 一般(每连接独立) | 中等偏低(低功耗、低延迟、小负载) | 超高(百万级 TPS) | 中等(万级) | 较高 | 较低 |
事务支持 | 不支持 | 不支持 | 弱事务(Exactly-once很难) | 支持事务 | 强事务支持(分布式事务) | 支持 |
部署复杂度 | 低 | 低(有QoS 2,但并不适用于复杂事务) | 高 | 低至中 | 中等 | 中高 |
适配网络 | 需长连接 | 非常好(2G/3G/卫星) | 要求网络质量高 | 中等 | 中等 | 中等 |
典型使用场景:
技术选型 | 场景 | 原因 |
---|---|---|
WebSocket | 网页聊天室 / 实时推送 / 游戏服务 | WebSocket 保持双向连接、低延迟 |
MQTT | 智能家居 / 传感器网络 / 远程设备 | 网络差、设备资源少,用 MQTT 非常合适 |
Kafka | 大数据日志收集 / 链路追踪 / 流处理平台 | 高吞吐、分区并行、顺序性好 |
RabbitMQ | 微服务异步通信 / 任务队列 / 异步任务处理 / RPC解耦 / 分布式微服务 | 轻量级,易用性强,支持多种协议 |
RocketMQ | 高并发订单处理 / 分布式事务 / 金融场景 | 支持消息顺序、事务、延迟等复杂功能 |
ActiveMQ | 企业集成 / 兼容 JMS 老系统 | JMS 支持好,适合传统业务 |
六.WebSocket 消息可靠性
1.防止消息丢失
WebSocket 本身是基于 TCP 的,因此它天然具备 可靠传输 机制(如 数据包确认、重传),但这并不意味着 WebSocket 一定不会丢消息。消息丢失的常见原因包括:
1.网络断连(导致消息未能成功送达)
2.服务器或客户端崩溃(导致消息未处理或未存储)
为了确保 WebSocket 消息不丢失,可以采取以下措施:
1. 客户端 ACK 机制
1.服务器发送消息时附加唯一 ID(如 message_id)。
2.客户端收到消息后,主动发送 ACK(确认收到该 message_id)。
3.服务器维护一个已发送但未确认的消息队列,若在一定时间内未收到 ACK,则重发该消息。
2. 断线重连与消息恢复
1.客户端检测 WebSocket 断开时自动重连(可使用 指数退避策略 避免频繁重连)。
2.服务器维护一个消息队列,对于已发送但未确认的消息,在客户端重连后重新推送。
3. WebSocket + 消息队列
在 高并发环境(例如 负载均衡后多个 WebSocket 服务器),可以使用消息队列(Kafka / RabbitMQ / Redis Stream) 确保消息可靠投递:
1.客户端 WebSocket 连接到不同服务器节点。
2.所有消息先进入消息队列(MQ),保证存储可靠性。
3.WebSocket 服务器从 MQ 中拉取数据后推送给客户端。
4.客户端 ACK 机制确保消息不会丢失。
4. WebSocket 分片传输
为防止大消息丢失,WebSocket 默认支持数据分片(fragmentation)。对于大数据包,建议:
1.开启 WebSocket 分片,防止因单个大消息丢失导致整个传输失败。
2.客户端拼接数据,确保完整收到。
2.防止消息堆积
WebSocket 消息堆积(Backpressure)通常发生在 消息生产速度 > 消费速度 时,导致服务器或客户端缓存过多数据,从而增加延迟、占用大量内存,甚至导致崩溃。
常见的消息堆积原因:
1.客户端处理速度太慢(如 UI 渲染卡顿、消息处理逻辑过重)。
2.网络延迟或不稳定(导致 TCP 发送窗口收缩)。
3.服务器推送速度过快(高频率推送数据,客户端处理不过来)。
4.未进行消息丢弃或限流处理(所有消息都缓存,导致堆积)。
解决方案:
1.采用消息丢弃策略
适用于 实时性高、但不要求每条消息都被处理的场景(如股票行情、弹幕系统)。
1.只保留最新消息,丢弃旧消息,防止堆积历史数据。
2.基于 Sliding Window(滑动窗口)策略,让客户端始终处理最新的数据,而不是堆积消息。
2. 客户端阻塞队列
适用于服务器端消息推送过快,客户端来不及处理的情况。
客户端添加阻塞队列,将数据先放入阻塞队列中, 然后客户端根据实际业务拉取数据。
3.使用消息队列进行缓冲
适用于 高并发 WebSocket 服务器,避免瞬时流量过载。
1.使用 Kafka / RabbitMQ / Redis Stream 作为消息缓冲层。
2.让 WebSocket 服务器从 MQ 拉取消息,避免消息直接推送导致积压。
4. 客户端批量拉取(Pull)模式
适用于 数据量大、实时性要求较低的场景(如日志推送)。
1.改为客户端定期拉取(Pull),而不是服务器主动推送(Push),需要依靠自己在服务端代码中实现。
2.减少每次 WebSocket 传输数据量,比如 客户端只拉取未读数据。
5. 使用数据压缩
适用于 消息体积过大,导致带宽占用过多的情况。
1.开启 WebSocket 的 permessage-deflate 压缩,减少传输数据大小。
2.在消息层面压缩数据,如 JSON 压缩、Protobuf、MessagePack 等。
七. HTTP 协议
HTTP 协议的全称是 HyperText Transfer Protocol(超文本传输协议),它是基于 TCP/IP 之上的应用层协议,用于客户端(通常是浏览器)和服务器之间传输文本、图片、视频(超文本)等资源。
1. HTTP 特性
1.无状态: 每个请求都是独立的,服务器不会记住上一个请求发生了什么。
2.基于请求/响应: 客户端主动发起请求,服务器被动响应。
3.明文传输(HTTP): HTTP 本身是明文的,数据不加密,容易被中间人监听,但可以采用HTTP + SSL/TLS (HTTPS)加密层,更安全,防窃听、防篡改。
4.可扩展性强: 通过增加 Header 可以扩展很多功能,如认证、缓存、代理控制等,可以利用Session、Token、Cookie 等机制变为"有状态"。
2.HTTP 的长连接和短连接
HTTP 长连接和短连接是指:客户端和服务器之间的 TCP 连接的保持时长和复用方式不同。
HTTP 的“长连接”机制,其实是基于传输层 TCP 的长连接实现的!HTTP 是一个应用层协议,它本身并不负责建立连接。它只是通过设置一些头部字段(如 Connection: keep-alive)告诉 TCP 层“我希望复用连接”。而实际的连接管理、复用、保持活动等操作,都是由TCP(传输层)来负责的。
1.短连接: 每一次请求响应之后就立即断开连接。下一次请求还要重新建立一次 TCP 连接(三次握手)和断开连接(四次挥手)。HTTP/1.0 默认短连接。适用于请求极少或对连接复用要求不高的场景
2.长连接(Keep-Alive): 建立一次 TCP 连接后,可以复用这条连接来发送多个 HTTP 请求。连接不会马上关闭,保持一段时间。HTTP/1.1 开始默认长连接。适用于高并发系统、大量请求场景(如网页加载)
小注:我们通过 curl -v 接口的方式,查看我们开发的接口是不是长连接模式
3.HTTP/1.0、HTTP/1.1、HTTP/2 和 HTTP/3区别
对比项 | HTTP/1.0 | HTTP/1.1 | HTTP/2 | HTTP/3 |
---|---|---|---|---|
发布年份 | 1996 | 1997 | 2015 | 2022 |
连接方式 | 每次请求新建 TCP 连接 | 支持 Keep-Alive 长连接 | 单个 TCP 连接支持并发请求(多路复用) | 基于 UDP 的 QUIC 协议,天然支持并发 |
多路复用 | 不支持 | 不支持,串行处理 | 支持 Stream 多路复用 | 更高效的多路复用(无 TCP 队头阻塞) |
编码格式 | 纯文本 | 纯文本 | 二进制帧结构 | 二进制帧结构 |
请求并发 | 一个连接一次只能传一个请求 | 串行排队传多个请求 | 多个请求并发(每个 Stream 有 ID) | 多个请求并发,且每个 Stream 独立 |
队头阻塞 | 严重 | 严重 | TCP 层可能发生 | 基于 QUIC,无队头阻塞 |
头部压缩 | 无 | 无 | HPACK 压缩 | QPACK 压缩(优化丢包处理) |
性能提升机制 | 无 | 管线化(Pipeline),效果差 | 多路复用、压缩、服务器推送 | 所有 HTTP/2 优化 + 低延迟 + 0-RTT |
加密支持 | 无 | 可选(依赖 HTTPS) | 强烈推荐使用 TLS | 内建 TLS 1.3(加密是基础) |
服务端推送 | 无 | 无 | 支持 Server Push | 支持 Server Push(但使用受限) |
TCP/UDP 协议 | TCP | TCP | TCP | QUIC(基于 UDP) |
建连延迟 | 高,每次新建连接 | 中,支持连接复用 | 中,但仍有 TCP/TLS 握手 | 最低,支持 0-RTT |
丢包处理 | TCP 重传 | TCP 重传 | TCP 重传(会影响所有流) | 每个流独立丢包重传,不互相影响 |
支持情况 | 老旧系统、教学用途 | 依然广泛使用 | 主流浏览器和 CDN 均支持 | 逐步发展部署,现代浏览器已支持 |
典型使用场景 | 老旧 HTTP 客户端 | 中小型 Web 服务 | 现代 Web、API、高并发系统 | 视频、移动网络、CDN、低延迟需求场景 |
八. 接口幂等性
1. 什么是接口幂等性(Idempotency)?
一个接口在被客户端调用一次和多次时,产生的效果是一样的,不会因为多次调用造成多次执行的副作用。
2. 哪些接口必须幂等?
有些接口一旦重复调用,就可能引起严重后果,比如数据重复、资金重复扣款、状态错乱、安全漏洞(如重复发券)等问题,因此有些接口必须具有幂等性。这些问题经常出现在用户重复操作、网络抖动的场景(如客户端重试)
1.支付接口 : 重复扣款,直接经济损失。
2.订单创建接口 : 重复下单,库存混乱。
3.发放优惠券/积分接口 : 重复发放,用户领取。
4.账号注册 / 绑定接口 : 重复账号、账号绑定混乱。
5.提现、转账接口 : 重复执行,造成资金泄漏。
6.消息推送接口(短信、邮件) : 多次通知,骚扰用户,浪费资源。
7.评论/点赞接口(视业务定制) : 重复点赞或灌水式评论。
3.天然具有幂等性接口
有些接口哪怕调用多次,结果也不会改变系统状态或产生副作用。这是在设计上带来的幂等性,也叫弱幂等接口。
1.查询类接口 : 查询特定信息,纯读操作,不会改变数据状态。
2.状态设置类接口 : 比如“设置开启状态”,重复设值没副作用。
3.幂等删除接口 : 如 DELETE 特定id的数据,即使删除多次,最后系统仍是删除此数据状态。
4.更新字段类接口 : 如更新头像接口,传同样的图多次仍然是该头像。
5.PUT 接口(标准 Restful) : 设计本身就要求是幂等的。
6.幂等消息消费接口 : MQ 消费端处理幂等,可以重复消费但不重复处理数据。
这些接口本质上是 “读” 或 “覆盖写”,状态不会因为调用次数改变,就算没有针对性的加上幂等性验证,也不会出错。
4.不需要幂等的接口(或幂等性不重要)
有些接口本身多次执行的副作用可接受,或者幂等性并不会显著提升安全性或用户体验。
1. 获取验证码接口 : 只要控制频率,无需考虑幂等。
2. 日志上报/行为统计接口 : 多报几次无所谓,服务端聚合即可。
3. 推荐接口 : 查询结果每次都可能不同,没必要强求幂等。
5. 解决幂等性问题
5.1. 幂等key机制
幂等Key(Idempotent Key) 是一个在客户端生成的唯一标识符,用来标记一次请求的唯一性。
幂等Key机制可以解决:
1. 避免业务副作用的重复发生。
2. 让服务端识别并跳过重复请求。
3. 提升系统健壮性:可以应对网络抖动、客户端重试、幂等测试、异常恢复等问题。
幂等Key 要满足的条件:
1.唯一性 : 每次不同操作请求必须有不同的 Key。
2.有时效性 : 避免 Redis / DB 中 Key 永久堆积。
3.可追踪性 : 最好能标记出请求来源 / 用户 / 业务类型。
4.幂等性绑定粒度合适 : 粒度太大会误判重复,太小会放过重复。
常见的幂等Key 设计方式:
1.客户端传 UUID : 该方案最灵活、比较通用。
2.订单号作为 Key : 对于下单类接口, 以订单号作为key值, 做唯一性校验。
3.用户ID + 操作类型 + 时间戳哈希 : 例如 user:123:create:xxxhash 进行幂等性校验。
4.签名摘要内容(body + token hash) : 接口请求幂等性校验(内容型幂等)。
幂等Key的常用存储方式:
1. Redis : 查询快速快速、支持原子性、自动过期, 比如 setIfAbsent() 方法。
2. 数据库 : 可持久化、可审计,对于高价值交易类行为比较适用(如支付流水)。
注: 建议采用AOP切面 / 拦截器统一处理,提升效率。
注意: 要由客户端生成唯一key,而不能由服务端去自动生成或者采用自增key,这时要考验客户端能不能识别用户“点两次”发起的是两个不同的请求,以及客户端如何管理“幂等 ID”生命周期。
常用方式:
1. 客户端显式生成并持久化幂等 ID
1. 适用于业务操作明确、一次性动作, 如下单、提交支付、申请退款、表单提交。
2.用户刷新前都用同一个请求 ID,多次点击只发送该 ID。
3.客户端要负责缓存生命周期,本地存储依赖浏览器机制。
2.由前端操作模型决定 UUID 生命周期
1. UUID 在点击前生成。
2. 多次点击不会重复生成 UUID。
3. 用户手动刷新页面或切换 tab 才重新生成。
3.由业务模型生成幂等ID
1.根据业务信息构造一个稳定幂等 ID,如 hash(user_id + 商品id + 当前时间段)。
2.只要用户在相同时间内提交同一个商品请求,生成的 ID 是一样的。
3.客户端存下来这个 ID,用于控制幂等。
5.2.基于业务唯一约束
直接依赖数据库的唯一性约束(unique key)来强行拒绝重复创建数据, 此方式适用于创建型接口, 对于更新操作无效, 该方式简单直接,可靠性高,但需要注意,此时的唯一Key 不能在业务中自动生成UUID或采用自增主键, 需由客户端生成并管理。
5.3.防重令牌机制
防重令牌(Idempotency Token)是由服务端提前生成的一种“唯一标识符”,交给客户端,用来标识一类幂等操作。客户端每次请求必须携带Token,服务端校验这个 Token 是否存在且未使用,后续相同 Token 的请求会被拒绝或返回缓存结果。
1.客户端先从服务端请求一个唯一 Token, 这个 Token 服务端会同时存到 Redis 中。
2.客户端提交表单时携带该 Token。
3.服务端验证该 Token 是否存在并且立刻删除(原子操作)。
4.验证成功则正常提交,失败则拒绝执行。
防重 Token 的校验必须是 Redis 层的原子操作,否则在并发下会出现逻辑漏洞。
String lua = """if redis.call('GET', KEYS[1]) thenreturn redis.call('DEL', KEYS[1])elsereturn 0end""";Long result = stringRedisTemplate.execute(new DefaultRedisScript<>(lua, Long.class),Collections.singletonList("idempotent:token:" + token)
);if (result == null || result == 0) {return error("重复或非法请求");
}
5.4. 幂等性中间件 / API 网关处理
把幂等性校验从业务服务内部抽离出来,放到服务调用链的前置层(如网关、Filter、拦截器、中间件),在请求到达业务服务之前,先判断是不是重复请求,是的话直接拦截掉。
5.4.1 API 网关层统一处理
在微服务中, 我们可以采用以下方案, NGINX + Lua 模块,Kong/APISIX/Envoy, Spring Cloud Gateway + Redis 等方案。
1. 判断是否携带幂等 key(或 Token)。
2. 使用 Redis 等缓存判断是否已经处理。
3. 拦截掉重复请求,或者直接返回上次结果(如存储响应缓存)。
4. 合法请求则将 key 记录为“已处理”,放行到后端服务。
5.4.2 统一中间件 / SDK 封装
多服务内统一接入 SDK(比如拦截器、注解),比如写一个 @Idempotent 注解,开发者只需要加在方法上。