当前位置: 首页 > news >正文

平台消息推送(go)

实现方案

在 Go 后端开发中实现消息推送,需要根据实时性要求、客户端类型(Web / 移动端)、用户规模等场景选择合适的技术方案。以下是常见实现方式及关键细节:

核心技术方案对比

首先明确不同场景的技术选型,常见方案如下:

方案实时性适用场景优势劣势
短轮询(Polling)非实时场景(如公告更新)实现简单,兼容性好无效请求多,服务器压力大
长轮询(Long Poll)近实时场景(如订单状态通知)减少请求次数,比短轮询高效连接持有时间长,服务器资源占用较高
WebSocket实时交互(如聊天、实时数据展示)全双工通信,低延迟,少带宽消耗需要维护长连接,兼容性依赖客户端
SSE(Server-Sent Events)服务器单向推送(如实时日志)基于 HTTP,实现简单,适合单向场景仅服务器→客户端,不支持双向
第三方推送服务移动端推送(iOS/Android)无需维护长连接,支持离线推送

具体实现

Go 的goroutine和channel天然适合处理高并发连接,以下是关键方案的实现细节:

1.短轮询(最简单方案)

客户端定期(如每 30 秒)通过 HTTP 请求拉取消息,服务器返回最新数据。
后端实现

// 消息存储(实际可用Redis/MySQL)
var messages = map[string][]string{ // userID -> 消息列表"user123": {"您有新订单", "账户余额更新"},
}// 短轮询接口
func pollHandler(w http.ResponseWriter, r *http.Request) {userID := r.URL.Query().Get("user_id")if userID == "" {http.Error(w, "user_id required", http.StatusBadRequest)return}// 返回用户未读消息w.Header().Set("Content-Type", "application/json")json.NewEncoder(w).Encode(map[string]interface{}{"messages": messages[userID],"timestamp": time.Now().Unix(),})// 标记为已读(实际需根据业务处理)messages[userID] = []string{}
}func main() {http.HandleFunc("/poll", pollHandler)http.ListenAndServe(":8080", nil)
}

客户端:用 JS 定时调用/poll?user_id=xxx即可。

2. 长轮询(优化短轮询)

客户端发起请求后,服务器若没有新消息则hold 住连接(直到有消息或超时),客户端收到响应后立即发起新请求。
Go 后端实现:利用channel等待消息,结合context控制超时:

// 消息通知通道(userID -> 消息通道)
var userChannels = make(map[string]chan string)
var mu sync.Mutex // 保护map并发安全// 长轮询接口
func longPollHandler(w http.ResponseWriter, r *http.Request) {userID := r.URL.Query().Get("user_id")if userID == "" {http.Error(w, "user_id required", http.StatusBadRequest)return}// 创建用户的消息通道(若不存在)mu.Lock()if _, ok := userChannels[userID]; !ok {userChannels[userID] = make(chan string, 10) // 缓冲避免阻塞}ch := userChannels[userID]mu.Unlock()// 设置超时(如30秒)ctx, cancel := context.WithTimeout(r.Context(), 30*time.Second)defer cancel()// 等待消息或超时select {case msg := <-ch:// 有新消息,返回w.Header().Set("Content-Type", "application/json")json.NewEncoder(w).Encode(map[string]string{"message": msg})case <-ctx.Done():// 超时,返回空(客户端会重新请求)w.WriteHeader(http.StatusNoContent)}
}// 发送消息的函数(如订单系统调用)
func sendMessage(userID, msg string) {mu.Lock()defer mu.Unlock()if ch, ok := userChannels[userID]; ok {select {case ch <- msg: // 发送消息到通道default: // 通道满了,可存储到数据库(离线消息)saveOfflineMessage(userID, msg)}} else {// 用户未连接,存离线消息saveOfflineMessage(userID, msg)}
}func main() {http.HandleFunc("/long-poll", longPollHandler)http.ListenAndServe(":8080", nil)
}

3.移动端推送(依赖第三方服务)

针对 iOS/Android,需集成 APNs(iOS)、FCM(Android)或国内的极光推送、个推等,Go 通过 HTTP/HTTPS 调用其 API。
调用极光推送 API

func pushToMobile(userID, title, content string) error {// 1. 构造请求参数(参考极光文档)reqBody := map[string]interface{}{"platform": "all", // iOS+Android"audience": map[string]interface{}{"alias": []string{userID}, // 用userID作为别名},"notification": map[string]interface{}{"alert": content,"ios": map[string]interface{}{"title": title,},"android": map[string]interface{}{"title": title,},},}jsonBody, _ := json.Marshal(reqBody)// 2. 发送POST请求到极光APIclient := &http.Client{}req, err := http.NewRequest("POST", "https://api.jpush.cn/v3/push", bytes.NewBuffer(jsonBody))if err != nil {return err}// 极光认证:AppKey:MasterSecret 转Base64auth := base64.StdEncoding.EncodeToString([]byte("your_appkey:your_mastersecret"))req.Header.Set("Authorization", "Basic "+auth)req.Header.Set("Content-Type", "application/json")// 3. 处理响应resp, err := client.Do(req)if err != nil {return err}defer resp.Body.Close()if resp.StatusCode != http.StatusOK {body, _ := io.ReadAll(resp.Body)return fmt.Errorf("push failed: %s", string(body))}return nil
}

关键问题处理

  1. 连接管理:
  • 用map+互斥锁维护用户与连接的映射(如 WebSocket 的ConnectionManager)。
  • 处理断线重连:客户端需实现重连逻辑,服务器检测到连接关闭后清理映射。
  1. 离线消息:
  • 当用户未在线时,消息需暂存(用 Redis/MySQL),用户上线后同步推送。
  1. 分布式扩展:
  • 当服务器集群部署时,单个服务器无法感知其他节点的连接,需用中间件同步消息:
  • 用 Redis 的 Pub/Sub:每个服务器节点订阅消息频道,收到消息后检查本地是否有目标用户连接,有则推送。
  • 示例:消息产生时发布到user:{userID}频道,所有节点订阅并处理。
  1. 安全性:
  • WebSocket 握手时验证用户身份(如 Token),避免非法连接。
  • 敏感消息需加密传输(如 HTTPS/WSS)。
  1. 性能优化:
  • 限制单用户最大连接数(避免恶意连接)。
  • 用缓冲 channel 减少阻塞(如长轮询的消息通道)。

websocket

在 Go 后端实现实时消息推送给用户,核心是解决 “服务器主动向客户端发送消息” 的问题。目前主流方案有WebSocket(全双工,适合高频实时场景)、Server-Sent Events (SSE)(服务器单向推送,适合简单场景)、长轮询(兼容性好,但实时性稍弱)。其中 WebSocket 因双向通信、低延迟的特性,是实时消息场景的首选。

核心方案:WebSocket + 连接管理 + 集群支持

以下是基于 WebSocket 的完整实现思路,包含单服务和集群场景的处理。

  1. 技术选型
  • 通信协议:WebSocket(基于gorilla/websocket库,比标准库更易用)。
  • 连接管理:用map维护 “用户 ID - 连接” 映射(需保证并发安全)。
  • 集群支持:结合 Redis Pub/Sub 实现跨服务节点的消息广播。
  • 消息格式:JSON(便于前后端解析)。
  1. 单服务器场景实现
    单服务器场景下,只需在本地维护用户连接,直接推送消息即可。

定义核心结构

import ("sync""github.com/gorilla/websocket"
)// 消息结构(前后端约定)
type Message struct {Type    string `json:"type"` // 消息类型(如"chat"、"notice")Content string `json:"content"`ToUser  string `json:"to_user"` // 目标用户IDFromUser string `json:"from_user"` // 发送者IDTime    int64  `json:"time"` // 时间戳
}// 连接管理器:维护用户与WebSocket连接的映射(支持多设备连接)
type ConnManager struct {mu      sync.RWMutex               // 并发安全锁clients map[string][]*websocket.Conn // key: 用户ID,value: 该用户的所有连接(多设备)
}// 全局连接管理器实例
var manager = &ConnManager{clients: make(map[string][]*websocket.Conn),
}

连接管理器方法(增删查)

// 添加用户连接
func (m *ConnManager) Add(userID string, conn *websocket.Conn) {m.mu.Lock()defer m.mu.Unlock()m.clients[userID] = append(m.clients[userID], conn)
}// 移除用户连接(连接关闭时调用)
func (m *ConnManager) Remove(userID string, conn *websocket.Conn) {m.mu.Lock()defer m.mu.Unlock()if conns, ok := m.clients[userID]; ok {// 遍历删除目标连接for i, c := range conns {if c == conn {m.clients[userID] = append(conns[:i], conns[i+1:]...)break}}// 若用户无连接,删除keyif len(m.clients[userID]) == 0 {delete(m.clients, userID)}}
}// 获取用户的所有连接
func (m *ConnManager) Get(userID string) []*websocket.Conn {m.mu.RLock()defer m.mu.RUnlock()return m.clients[userID] // 若用户无连接,返回空切片
}

WebSocket 连接处理(握手、认证、消息循环)

客户端需先通过 WebSocket 握手建立连接,服务器需验证用户身份(如 Token),并维护连接。

import ("net/http""time"
)// WebSocket升级器(配置握手参数)
var upgrader = websocket.Upgrader{ReadBufferSize:  1024,WriteBufferSize: 1024,// 允许跨域(生产环境需限制Origin)CheckOrigin: func(r *http.Request) bool {return true},
}// WebSocket处理器:处理客户端连接请求
func WebSocketHandler(w http.ResponseWriter, r *http.Request) {// 1. 从请求中获取用户身份(如Token解析)token := r.URL.Query().Get("token")userID, err := parseToken(token) // 自定义Token解析逻辑if err != nil {http.Error(w, "未授权", http.StatusUnauthorized)return}// 2. 升级HTTP连接为WebSocketconn, err := upgrader.Upgrade(w, r, nil)if err != nil {http.Error(w, "升级连接失败", http.StatusInternalServerError)return}defer conn.Close()// 3. 将连接添加到管理器manager.Add(userID, conn)defer manager.Remove(userID, conn) // 连接关闭时移除// 4. 维持连接(读取客户端消息,或仅心跳检测)for {// 读取客户端消息(若不需要双向通信,可忽略)_, _, err := conn.ReadMessage()if err != nil {// 客户端断开连接(如刷新页面、网络中断)break}}
}// 解析Token获取用户ID(示例)
func parseToken(token string) (string, error) {// 实际场景:用jwt等库解析token,验证有效性后返回userIDreturn "user_123", nil // 模拟返回用户ID
}

消息推送逻辑

当有消息需要推送时,通过用户 ID 从管理器获取连接,逐个发送消息。

// 向指定用户推送消息
func PushToUser(userID string, msg Message) error {conns := manager.Get(userID)if len(conns) == 0 {return nil // 无连接,无需推送(可记录离线消息)}// 遍历用户的所有连接,发送消息for _, conn := range conns {// 设置写超时(防止阻塞)if err := conn.SetWriteDeadline(time.Now().Add(5 * time.Second)); err != nil {continue}// 发送JSON格式消息if err := conn.WriteJSON(msg); err != nil {// 发送失败(如连接已失效),移除该连接manager.Remove(userID, conn)continue}}return nil
}

启动服务

func main() {// 注册WebSocket路由http.HandleFunc("/ws", WebSocketHandler)// 启动HTTP服务(生产环境需用HTTPS,WebSocket需对应wss协议)http.ListenAndServe(":8080", nil)
}

集群场景扩展(多服务器)

当后端是多节点集群时,单节点的连接管理器无法感知其他节点的用户连接,需通过Redis Pub/Sub实现跨节点消息同步。
原理:每个服务器节点订阅 Redis 的 “消息频道”,当某节点需要推送消息时,先将消息发布到 Redis 频道,所有节点收到消息后,检查本地是否有目标用户的连接,有则推送。

初始化 Redis Pub/Sub

import ("context""github.com/go-redis/redis/v8"
)var redisClient = redis.NewClient(&redis.Options{Addr: "localhost:6379", // Redis地址
})
var ctx = context.Background()
const msgChannel = "user_messages" // Redis消息频道名

每个节点启动 Redis 订阅协程

// 启动Redis订阅,接收跨节点消息并推送
func StartRedisSubscriber() {pubSub := redisClient.Subscribe(ctx, msgChannel)defer pubSub.Close()// 循环接收消息for {msg, err := pubSub.ReceiveMessage(ctx)if err != nil {// 处理错误(如重连)continue}// 解析消息为Message结构var m Messageif err := json.Unmarshal([]byte(msg.Payload), &m); err != nil {continue}// 推送消息到本地用户(若有连接)PushToUser(m.ToUser, m)}
}

修改消息推送逻辑(发布到 Redis)

// 集群模式下的消息推送:先发布到Redis,再由各节点本地推送
func ClusterPushToUser(userID string, msg Message) error {// 1. 发布消息到Redis频道(供其他节点接收)msgBytes, err := json.Marshal(msg)if err != nil {return err}if err := redisClient.Publish(ctx, msgChannel, msgBytes).Err(); err != nil {return err}// 2. 本地节点也需推送(避免自己订阅自己的消息延迟)return PushToUser(userID, msg)
}

附加功能(生产环境必备)

  1. 心跳检测:防止连接假死(服务器定期发送ping,客户端回复pong,超时未回复则关闭连接)
// 在WebSocket连接循环中添加心跳检测
conn.SetPongHandler(func(string) error {conn.SetReadDeadline(time.Now().Add(30 * time.Second)) // 延长读超时return nil
})
// 定时发送ping
go func() {ticker := time.NewTicker(15 * time.Second)defer ticker.Stop()for range ticker.C {if err := conn.WriteMessage(websocket.PingMessage, []byte{}); err != nil {return // 发送失败,退出心跳}}
}()
  1. 离线消息存储:若用户无在线连接,将消息存入数据库(如 MySQL、MongoDB),用户上线后同步。
  2. 连接限流:限制单用户最大连接数(如 3 个设备),防止恶意创建连接。
  3. 权限校验:推送前验证发送者是否有权限向目标用户发送消息(如好友关系)

总结

推荐优先使用WebSocket + Redis Pub/Sub方案,既满足实时性需求,又支持集群扩展。核心是通过连接管理器维护用户连接,结合消息发布订阅实现跨节点同步,同时完善心跳、认证、离线存储等辅助功能。

场景:商品更新,消息推送

在 Go 后端实现 “商品更新时向收藏用户推送消息” 的功能,核心是打通 “商品更新事件” 到 “用户消息推送” 的链路,需要结合业务存储(收藏关系)、事件触发、用户筛选和消息推送能力。以下是具体实现方案:

一、整体流程设计

整个功能的核心链路可拆解为 4 步:

  1. 存储收藏关系:记录 “用户 - 商品” 的收藏映射(谁收藏了哪个商品)。
  2. 捕获商品更新事件:当商品信息(价格、库存、描述等)更新时,触发 “商品更新事件”。
  3. 筛选目标用户:根据商品 ID,查询所有收藏该商品的用户 ID 列表。
  4. 批量推送消息:向这些用户推送 “商品更新” 通知(基于之前的 WebSocket 实时推送能力)。

二、具体实现步骤

  1. 数据模型设计(存储收藏关系)
    需要一张表记录用户对商品的收藏关系,这里以 MySQL 为例:
-- 用户收藏表
CREATE TABLE user_collections (id INT AUTO_INCREMENT PRIMARY KEY,user_id VARCHAR(32) NOT NULL COMMENT '用户ID',product_id VARCHAR(32) NOT NULL COMMENT '商品ID',created_at DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '收藏时间',UNIQUE KEY uk_user_product (user_id, product_id) -- 避免重复收藏
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

Go 中定义对应的数据结构和数据库操作(使用gorm示例):

import ("gorm.io/gorm""time"
)// UserCollection 收藏关系模型
type UserCollection struct {ID         int64     `gorm:"primaryKey" json:"id"`UserID     string    `gorm:"index:idx_product;size:32" json:"user_id"` // 索引:按商品查用户ProductID  string    `gorm:"size:32" json:"product_id"`CreatedAt  time.Time `json:"created_at"`
}// 数据库操作:根据商品ID查询所有收藏用户
func GetCollectionUsers(db *gorm.DB, productID string) ([]string, error) {var userIDs []stringerr := db.Model(&UserCollection{}).Where("product_id = ?", productID).Pluck("user_id", &userIDs). // 只查询user_id字段Errorreturn userIDs, err
}
  1. 捕获商品更新事件(事件驱动设计)
    当商品信息更新时,需要触发一个 “商品更新事件”,避免业务逻辑耦合。可以用本地事件总线(单服务)或消息队列(集群 / 高可用)实现。

方案 1:单服务用本地事件总线(简单场景)
用 Go 的channel实现轻量事件总线:

// 事件定义
type ProductUpdateEvent struct {ProductID  string      `json:"product_id"` // 商品IDProduct    interface{} `json:"product"`    // 更新后的商品信息(如名称、价格等)UpdateTime time.Time   `json:"update_time"`
}// 事件总线(全局单例)
type EventBus struct {productUpdateCh chan ProductUpdateEvent // 商品更新事件通道
}var globalEventBus = &EventBus{productUpdateCh: make(chan ProductUpdateEvent, 1000), // 缓冲通道,避免阻塞
}// 发布商品更新事件
func PublishProductUpdateEvent(event ProductUpdateEvent) {select {case globalEventBus.productUpdateCh <- event:default:// 通道满时的降级处理(如日志记录,避免事件丢失)log.Printf("event bus full, drop product update event: %s", event.ProductID)}
}

在商品更新接口中发布事件

// 商品更新接口示例
func UpdateProductHandler(db *gorm.DB) http.HandlerFunc {return func(w http.ResponseWriter, r *http.Request) {// 1. 解析请求,获取商品ID和更新内容var req struct {ProductID string  `json:"product_id"`Price     float64 `json:"price"` // 示例:更新价格// 其他字段...}if err := json.NewDecoder(r.Body).Decode(&req); err != nil {http.Error(w, "invalid request", http.StatusBadRequest)return}// 2. 更新商品数据库product := Product{ID: req.ProductID, Price: req.Price} // 假设Product是商品模型if err := db.Model(&product).Updates(product).Error; err != nil {http.Error(w, "update product failed", http.StatusInternalServerError)return}// 3. 发布商品更新事件PublishProductUpdateEvent(ProductUpdateEvent{ProductID:  req.ProductID,Product:    product, // 可只传关键更新字段(如Price)UpdateTime: time.Now(),})w.WriteHeader(http.StatusOK)}
}

方案 2:集群场景用消息队列(如 RabbitMQ/Kafka)
如果是分布式系统,用消息队列(如 RabbitMQ)确保事件跨服务传递:

// 初始化RabbitMQ生产者(发布事件)
func InitRabbitMQProducer() (*amqp.Channel, error) {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")if err != nil {return nil, err}ch, err := conn.Channel()if err != nil {return nil, err}// 声明商品更新事件队列_, err = ch.QueueDeclare("product_update_events", // 队列名true,                    // 持久化false,                   // 不自动删除false,                   // 不排他false,                   // 不阻塞nil,)return ch, err
}// 发布事件到RabbitMQ
func PublishProductUpdateEventMQ(ch *amqp.Channel, event ProductUpdateEvent) error {eventBytes, err := json.Marshal(event)if err != nil {return err}return ch.Publish("",                      // 交换机(默认)"product_update_events", // 队列名false,false,amqp.Publishing{ContentType:  "application/json",Body:         eventBytes,DeliveryMode: amqp.Persistent, // 持久化消息},)
}
  1. 监听事件并筛选目标用户
    启动一个协程监听商品更新事件,收到事件后查询收藏该商品的用户:
// 启动事件处理器(单服务场景)
func StartProductUpdateHandler(db *gorm.DB) {go func() {for event := range globalEventBus.productUpdateCh {// 1. 查询收藏该商品的所有用户userIDs, err := GetCollectionUsers(db, event.ProductID)if err != nil {log.Printf("get collection users failed: %v", err)continue}if len(userIDs) == 0 {continue // 无收藏用户,无需推送}// 2. 生成推送消息product, ok := event.Product.(Product)if !ok {log.Printf("invalid product data: %v", event.Product)continue}msg := Message{Type:    "product_update", // 消息类型:商品更新Content: fmt.Sprintf("您收藏的商品「%s」价格已更新为%.2f元", product.Name, product.Price),ToUser:  "", // 批量推送,后续逐个设置FromUser: "system", // 系统发送Time:     event.UpdateTime.Unix(),}// 3. 向每个用户推送消息for _, userID := range userIDs {msg.ToUser = userIDif err := ClusterPushToUser(userID, msg); err != nil { // 复用之前的集群推送函数log.Printf("push to user %s failed: %v", userID, err)}}}}()
}// 集群场景:从RabbitMQ消费事件(类似逻辑)
func StartProductUpdateMQConsumer(db *gorm.DB, ch *amqp.Channel) {msgs, err := ch.Consume("product_update_events", // 队列名"",                      // 消费者标签true,                    // 自动确认false,                   // 不排他false,                   // 不阻塞false,                   // 无参数nil,)if err != nil {log.Fatalf("failed to register consumer: %v", err)}go func() {for d := range msgs {var event ProductUpdateEventif err := json.Unmarshal(d.Body, &event); err != nil {log.Printf("unmarshal event failed: %v", err)continue}// 后续逻辑同上:查询用户→推送消息}}()
}
  1. 处理离线用户(消息持久化)
    如果用户当前不在线(无 WebSocket 连接),需要将消息存入数据库,待用户上线后同步:
// 离线消息表
type OfflineMessage struct {ID        int64     `gorm:"primaryKey" json:"id"`UserID    string    `gorm:"index" json:"user_id"` // 按用户查询Message   string    `json:"message"` // 消息内容(JSON字符串)Read      bool      `json:"read"`    // 是否已读CreatedAt time.Time `json:"created_at"`
}// 推送消息时,若用户离线则存储离线消息
func PushToUserWithOffline(userID string, msg Message, db *gorm.DB) error {conns := manager.Get(userID)if len(conns) > 0 {// 在线,直接推送return PushToUser(userID, msg)}// 离线,存储到数据库msgBytes, err := json.Marshal(msg)if err != nil {return err}return db.Create(&OfflineMessage{UserID:    userID,Message:   string(msgBytes),Read:      false,CreatedAt: time.Unix(msg.Time, 0),}).Error
}

用户上线时(WebSocket 连接建立后),同步离线消息:

// WebSocket连接建立后,同步离线消息
func syncOfflineMessages(userID string, conn *websocket.Conn, db *gorm.DB) {var offlineMsgs []OfflineMessageif err := db.Where("user_id = ? AND read = ?", userID, false).Order("created_at ASC").Find(&offlineMsgs).Error; err != nil {log.Printf("sync offline messages failed: %v", err)return}// 推送离线消息for _, msg := range offlineMsgs {var m Messageif err := json.Unmarshal([]byte(msg.Message), &m); err != nil {continue}if err := conn.WriteJSON(m); err != nil {return}// 标记为已读db.Model(&OfflineMessage{}).Where("id = ?", msg.ID).Update("read", true)}
}// 在WebSocketHandler中添加同步逻辑
func WebSocketHandler(w http.ResponseWriter, r *http.Request, db *gorm.DB) {// ... 之前的认证、升级连接逻辑 ...// 同步离线消息syncOfflineMessages(userID, conn, db)// ... 后续的连接维护逻辑 ...
}

三、优化点(生产环境必备)

  • 批量查询优化:如果商品收藏用户过多(如 10 万 +),GetCollectionUsers需分页查询(LIMIT + OFFSET),避免一次性加载大量数据导致内存溢出。
  • 推送频率限制:若商品短时间内频繁更新(如 1 分钟内改 3 次价格),可合并消息(只推最后一次更新),避免骚扰用户。可通过 Redis 记录商品最近推送时间,过滤高频更新:
// 检查是否需要推送(10分钟内只推一次)
func needPush(productID string) bool {key := fmt.Sprintf("product_push_limit:%s", productID)// 若key不存在,设置10分钟过期;若存在,返回falseexists, _ := redisClient.SetNX(ctx, key, 1, 10*time.Minute).Result()return exists
}
  • 消息模板化:将消息内容定义为模板(如"您收藏的商品「{{.Name}}」已更新:{{.Changes}}"),通过模板引擎(如text/template)动态生成,便于维护。
  • 监控与告警:统计消息推送成功率、离线消息堆积量,通过 Prometheus+Grafana 监控,异常时告警(如推送失败率超过 5%)。

总结

整个功能的核心是 “事件触发→用户筛选→消息推送” 的闭环:
• 用数据库存储用户 - 商品收藏关系,确保能快速找到目标用户;
• 用事件总线 / 消息队列解耦商品更新和消息推送逻辑,提高可维护性;
• 结合 WebSocket 实时推送 + 离线消息存储,保证用户无论在线与否都能收到通知;
• 通过分页、频率限制等优化,适配高并发场景。
按这个思路实现,即可稳定支持 “商品更新时向收藏用户推送消息” 的需求。

用户登录、离线状态推送

要实现 “商品更新时,在线用户立即提醒,离线用户登录后提醒” 的功能,核心是精准判断用户状态并区分处理逻辑:在线状态依赖实时连接推送,离线状态依赖消息持久化 + 登录时同步。以下是具体实现方案,包含状态判断、消息处理、登录同步的完整链路。

一、核心逻辑拆解

  1. 用户状态定义:
    ◦ 在线:用户有活跃的 WebSocket 连接(连接管理器中存在该用户的有效连接)。
    ◦ 离线:用户无活跃 WebSocket 连接(连接管理器中无该用户的连接)。
  2. 处理流程:商品更新 → 筛选收藏用户 → 逐个判断用户状态 → 在线则实时推送 → 离线则存储离线消息 → 用户登录时同步离线消息。

二、具体实现步骤

  1. 完善用户状态判断(基于连接管理器)
    复用之前的ConnManager,通过检查用户是否有活跃连接判断状态:
// 判断用户是否在线(有活跃WebSocket连接)
func (m *ConnManager) IsOnline(userID string) bool {m.mu.RLock()defer m.mu.RUnlock()conns, ok := m.clients[userID]// 存在连接且至少有一个有效连接(实际可更严格:检查连接是否存活)return ok && len(conns) > 0
}

注:严格来说,需判断连接是否 “存活”(如最近 30 秒内有心跳交互),可在ConnManager中为连接添加最后活跃时间,定期清理死连接。

  1. 商品更新事件处理(区分在线 / 离线用户)
    当商品更新事件触发后,对每个收藏用户执行 “在线推送 / 离线存储” 逻辑:
// 处理商品更新事件的核心函数
func handleProductUpdateEvent(event ProductUpdateEvent, db *gorm.DB) {// 1. 筛选收藏该商品的用户(分页查询,避免大量用户导致内存溢出)userIDs, err := getCollectionUsersWithPage(db, event.ProductID, 0, 100) // 分页:页号0,每页100条if err != nil {log.Printf("查询收藏用户失败: %v", err)return}// 2. 生成推送消息(统一格式)msg := buildProductUpdateMsg(event) // 构建消息内容(见下方)// 3. 逐个处理用户for _, userID := range userIDs {// 3.1 检查用户是否在线if manager.IsOnline(userID) {// 在线:实时推送if err := pushOnlineMessage(userID, msg); err != nil {log.Printf("向用户[%s]实时推送失败: %v", userID, err)}} else {// 离线:存储离线消息if err := saveOfflineMessage(userID, msg, db); err != nil {log.Printf("存储用户[%s]离线消息失败: %v", userID, err)}}}// 4. 处理下一页(若有更多用户)// (逻辑:循环分页查询,直到获取所有用户)
}// 构建商品更新消息
func buildProductUpdateMsg(event ProductUpdateEvent) Message {product := event.Product.(Product) // 假设Product包含Name、Price等字段return Message{Type:    "product_update",Content: fmt.Sprintf("您收藏的商品「%s」已更新,最新价格:%.2f元", product.Name, product.Price),FromUser: "system",Time:     event.UpdateTime.Unix(),}
}
  1. 在线用户:实时推送(复用 WebSocket)
    直接使用之前的PushToUser函数,向用户的所有活跃连接推送消息:
// 向在线用户推送消息
func pushOnlineMessage(userID string, msg Message) error {msg.ToUser = userID // 填充目标用户IDreturn PushToUser(userID, msg) // 复用WebSocket推送逻辑
}
  1. 离线用户:存储离线消息(数据库持久化)
    设计离线消息表,存储未送达的消息,待用户登录时同步:
    (1)离线消息表设计(MySQL)
CREATE TABLE offline_messages (id BIGINT AUTO_INCREMENT PRIMARY KEY,user_id VARCHAR(32) NOT NULL COMMENT '目标用户ID',message JSON NOT NULL COMMENT '消息内容(JSON格式)',created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',is_read TINYINT NOT NULL DEFAULT 0 COMMENT '是否已读(0-未读,1-已读)',INDEX idx_user_created (user_id, created_at) -- 按用户+时间查询
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

(2)Go 模型与存储逻辑

// OfflineMessage 离线消息模型
type OfflineMessage struct {ID        int64     `gorm:"primaryKey" json:"id"`UserID    string    `gorm:"index:idx_user_created" json:"user_id"`Message   string    `gorm:"type:json" json:"message"` // 存储Message的JSON字符串CreatedAt time.Time `gorm:"index:idx_user_created" json:"created_at"`IsRead    bool      `json:"is_read"`
}// 存储离线消息
func saveOfflineMessage(userID string, msg Message, db *gorm.DB) error {// 将消息序列化为JSONmsgBytes, err := json.Marshal(msg)if err != nil {return err}// 存入数据库return db.Create(&OfflineMessage{UserID:    userID,Message:   string(msgBytes),CreatedAt: time.Unix(msg.Time, 0),IsRead:    false,}).Error
}
  1. 用户登录时:同步离线消息
    当用户建立 WebSocket 连接(登录状态确认)后,立即查询并推送其未读的离线消息:
    (1)WebSocket 连接建立时触发同步
// WebSocket处理器(增加离线消息同步)
func WebSocketHandler(w http.ResponseWriter, r *http.Request, db *gorm.DB) {// 1. 认证用户(解析Token获取userID)userID, err := authenticateUser(r) // 自定义认证逻辑if err != nil {http.Error(w, "未授权", http.StatusUnauthorized)return}// 2. 升级为WebSocket连接conn, err := upgrader.Upgrade(w, r, nil)if err != nil {http.Error(w, "连接升级失败", http.StatusInternalServerError)return}defer conn.Close()// 3. 添加连接到管理器(标记用户为在线)manager.Add(userID, conn)defer manager.Remove(userID, conn)// 4. 同步离线消息(核心:用户登录后立即推送未读消息)if err := syncOfflineMessages(userID, conn, db); err != nil {log.Printf("用户[%s]离线消息同步失败: %v", userID, err)}// 5. 维持连接(心跳+消息循环)maintainWebSocketConn(conn, userID)
}

(2)离线消息同步逻辑

// 同步用户未读的离线消息
func syncOfflineMessages(userID string, conn *websocket.Conn, db *gorm.DB) error {// 1. 查询未读离线消息(按时间升序,确保顺序正确)var offlineMsgs []OfflineMessageerr := db.Where("user_id = ? AND is_read = ?", userID, false).Order("created_at ASC").Find(&offlineMsgs).Errorif err != nil {return fmt.Errorf("查询离线消息失败: %w", err)}if len(offlineMsgs) == 0 {return nil // 无未读消息}// 2. 逐个推送到客户端for _, om := range offlineMsgs {// 反序列化消息var msg Messageif err := json.Unmarshal([]byte(om.Message), &msg); err != nil {log.Printf("解析离线消息失败: %v", err)continue}// 推送消息if err := conn.WriteJSON(msg); err != nil {return fmt.Errorf("推送离线消息失败: %w", err)}// 标记为已读if err := db.Model(&OfflineMessage{}).Where("id = ?", om.ID).Update("is_read", true).Error; err != nil {log.Printf("标记消息已读失败: %v", err)}}return nil
}

三、关键细节与优化

  1. 连接存活检测(避免误判在线状态)
    用户可能存在 “假在线” 状态(如网络中断但连接未关闭),需通过心跳机制清理死连接:
// 维持WebSocket连接(包含心跳检测)
func maintainWebSocketConn(conn *websocket.Conn, userID string) {// 设置Pong回调(客户端回复心跳)conn.SetPongHandler(func(string) error {// 延长读超时(30秒内收到Pong视为存活)return conn.SetReadDeadline(time.Now().Add(30 * time.Second))})// 定时发送Ping(每15秒一次)pingTicker := time.NewTicker(15 * time.Second)defer pingTicker.Stop()// 读消息循环(同时检测连接是否存活)for {select {case <-pingTicker.C:// 发送Ping,若失败则关闭连接if err := conn.WriteMessage(websocket.PingMessage, []byte{}); err != nil {log.Printf("用户[%s]心跳发送失败,关闭连接: %v", userID, err)return}default:// 读取客户端消息(无消息时阻塞,超时后返回错误)_, _, err := conn.ReadMessage()if err != nil {log.Printf("用户[%s]连接断开: %v", userID, err)return}}}
}
  1. 批量处理与性能优化
    • 分页查询收藏用户:若商品收藏用户过多(如 10 万 +),单次查询会导致内存暴涨,需分页处理(LIMIT + OFFSET或游标分页)。
    • 批量插入离线消息:当离线用户数量大时,用gorm的CreateInBatches批量插入,减少数据库交互次数:
// 批量存储离线消息
func batchSaveOfflineMessages(userIDs []string, msg Message, db *gorm.DB) error {msgBytes, _ := json.Marshal(msg)var offlineMsgs []OfflineMessagefor _, userID := range userIDs {offlineMsgs = append(offlineMsgs, OfflineMessage{UserID:    userID,Message:   string(msgBytes),CreatedAt: time.Unix(msg.Time, 0),})}return db.CreateInBatches(offlineMsgs, 100).Error // 每100条批量插入
}
  1. 消息幂等性(避免重复推送)
    • 为每条消息生成唯一 ID(如msg.ID = uuid.New().String()),推送 / 存储时检查该 ID 是否已处理,避免因事件重试导致重复。
    • 离线消息同步时,即使重复推送,客户端也可根据msg.ID去重。
  2. 离线消息清理(避免存储膨胀)
    • 定期清理过期消息(如 30 天前的已读消息),可用定时任务(cron)执行:
// 清理30天前的已读离线消息
func cleanExpiredOfflineMessages(db *gorm.DB) error {expiredTime := time.Now().Add(-30 * 24 * time.Hour)return db.Where("is_read = ? AND created_at < ?", true, expiredTime).Delete(&OfflineMessage{}).Error
}

四、总结

整个流程通过 “连接管理器判断在线状态”“实时推送 + 离线存储”“登录同步” 三个核心环节,实现了商品更新消息的精准触达:
• 在线用户依赖 WebSocket 实时推送,确保时效性;
• 离线用户通过数据库存储消息,登录时自动同步,确保不丢失;
• 配合心跳检测、分页处理、幂等性设计,可支撑高并发场景。
按此方案实现,既能满足用户体验(实时提醒),又能保证消息可靠性(离线不丢失)。

http://www.dtcms.com/a/600632.html

相关文章:

  • uniapp集成爱山东获取用户信息
  • Python编程实战 - Python实用工具与库 - 操作Excel:openpyxl / pandas
  • 开展我国电子网站建设wordpress表白
  • Java 在 Excel 中添加或删除批注:Spire.XLS for Java 实践指南
  • uniapp 使用unocss的问题
  • [Linux——Lesson23.线程概念与控制:线程基础]
  • 四大主流浏览器Chrome、Edge、Safari、Firefox内核检测免费工具评测
  • 弱网通话没保障?多网聚合,逐包调度,新技术扫除网络痛点
  • 网站制作公司的网站贵阳网站改版
  • 电脑硬件价格呈现持续上涨趋势及软件优化的必要性
  • Spring集成kafka的最佳方式
  • 设计网站怎么做网业是什么行业
  • RK3588应用分享之国产化系统-开源鸿蒙OpenHarmony
  • RabbitMQ-基础-总结
  • 学习react第二天
  • 【JVS更新日志】低代码、APS排产、物联网、企业计划11.12更新说明!
  • 前端注释规范:如何写“后人能看懂”的注释(附示例)
  • C语言编译器下载地址 | 如何选择适合自己的C语言编译器
  • HarmonyOS之深入解析如何实现语音朗读能力
  • 台州企业网站的建设做网站能挣多少钱
  • 网站开发内容包括哪些wordpress 统计代码
  • 【昇腾CANN工程实践】BERT情感分析API性能优化实录:从CPU到NPU的15倍加速
  • 【Linux基础开发工具 (二)】详解Linux文本编辑器:Vim从入门到精通——完整教程与实战指南(上)
  • 使用 BR 备份 TiDB 到阿里云 OSS 存储
  • 机器学习项目——基于集成学习提升树情绪分类(代码/论文)
  • C++ 抽象类与多态原理深度解析:从纯虚函数到虚表机制(附高频面试题)
  • 尚硅谷 SpringCloud 01 分布式概念-工程创建-nacos安装-nacos服务注册与发现 -远程调用
  • C# Sqlite帮助类
  • 传统方式部署 Hadoop 高可用集群
  • 微软 Win11 经典版 Outlook 曝 BUG,加速 SSD 损耗