当前位置: 首页 > news >正文

go游戏后端开发21:处理nats消息

处理 NATS 订阅的消息

在 WebSocket 的管理模块中,我们之前已经处理了一些消息。这些消息通过 NATS 订阅过来,我们需要对这些消息进行进一步的处理。一旦消息到达,我们需要执行相应的操作,并将结果发送回去,包括之前的操作。

理论上,所有消息都应该能够到达这里进行处理。目前,我们还没有对这部分进行处理,接下来需要完善这部分逻辑。

3. 解析消息并处理

当收到消息后,我们需要解析消息内容。解析完成后,我们需要根据消息的类型(type)进行不同的处理。

  • 特殊类型处理

    • 如果消息类型是“三审”(假设的类型),我们需要进行特殊处理。因为这种类型的消息可能存储在 WebSocket 的连接中,而不是直接推送给客户端。

    • 如果消息类型是“replace”或“response”,我们需要将消息类型改为“response”,以便正确地将消息发送回客户端。

  • 普通类型处理

    • 如果消息类型是“pose”,我们需要将消息放入一个专门的通道(Channel)中进行处理。这样可以提高系统的承载能力,避免消息积压。

4. 消息推送逻辑

在处理完消息后,我们需要将消息推送给客户端。具体步骤如下:

  1. 获取客户端 ID

    • 从消息中获取当前客户端的 ID。如果客户端不存在,我们需要记录日志并提示客户端已下线。

  2. 编码消息

    • 对消息进行编码处理,确保消息格式正确。

  3. 发送消息

    • 将编码后的消息发送给客户端。如果消息类型是“pose”,我们需要循环处理所有相关客户端,并将消息推送给每个客户端。

5. 代码实现

以下是优化后的代码实现:

package websocket

import (
	"encoding/json"
	"fmt"
	"log"
	"sync"

	"github.com/nats-io/nats.go"
	"github.com/gorilla/websocket"
)

type WebSocketManager struct {
	sync.RWMutex
	clients    map[*websocket.Conn]string
	messageCh  chan *Message
	pushCh     chan *Message
	natsConn   *nats.Conn
}

type Message struct {
	Type    string          `json:"type"`
	Data    json.RawMessage `json:"data"`
	ClientID string          `json:"clientID"`
}

func NewWebSocketManager(natsConn *nats.Conn) *WebSocketManager {
	return &WebSocketManager{
		clients:   make(map[*websocket.Conn]string),
		messageCh: make(chan *Message, 1024),
		pushCh:    make(chan *Message, 1024),
		natsConn:  natsConn,
	}
}

func (wm *WebSocketManager) handleMessage(msg *Message) {
	log.Printf("Handling message: %+v", msg)

	// 获取客户端连接
	clientID := msg.ClientID
	clientConn, exists := wm.clients[clientID]
	if !exists {
		log.Printf("Client not found: %s", clientID)
		return
	}

	// 根据消息类型处理
	switch msg.Type {
	case "response":
		// 处理响应消息
		wm.sendToClient(clientConn, msg)
	case "pose":
		// 处理推送消息
		wm.pushCh <- msg
	default:
		log.Printf("Unknown message type: %s", msg.Type)
	}
}

func (wm *WebSocketManager) sendToClient(clientConn *websocket.Conn, msg *Message) {
	encodedMsg, err := json.Marshal(msg)
	if err != nil {
		log.Printf("Failed to encode message: %v", err)
		return
	}

	err = clientConn.WriteMessage(websocket.TextMessage, encodedMsg)
	if err != nil {
		log.Printf("Failed to send message to client: %v", err)
	}
}

func (wm *WebSocketManager) processPushMessages() {
	for msg := range wm.pushCh {
		for clientConn := range wm.clients {
			wm.sendToClient(clientConn, msg)
		}
	}
}

func (wm *WebSocketManager) start() {
	go wm.processPushMessages()

	// NATS 订阅消息
	nc := wm.natsConn
	nc.Subscribe("ws.messages", func(msg *nats.Msg) {
		var wsMsg Message
		err := json.Unmarshal(msg.Data, &wsMsg)
		if err != nil {
			log.Printf("Failed to unmarshal NATS message: %v", err)
			return
		}
		wm.handleMessage(&wsMsg)
	})
}

相关文章:

  • 数据结构实验1.2: 顺序表的基本运算
  • 【CSS】- 表单控件的 placeholder 如何控制换行显示?
  • 自动驾驶---学术论文的常客:nuScenes数据集的使用
  • 卫星电话究竟是“锦上添花”?还是“刚需之选”?
  • android 设置状态栏背景
  • JAVA-Spring Boot多线程
  • React(九)React Hooks
  • RabbitMQ的工作模式
  • nginx的自动跳转https
  • 客户端给服务器发数据,服务器不显示:开放端口操作
  • 【工作梳理】怎么把f12里面的东西导入到postman
  • 赛逸展2025“创新引擎”启动:限量席位,点亮科技绿色新征程
  • 七. JAVA类和对象(一)
  • 权限框架SpringSecurity介绍
  • 【功能开发】DSP F2837x 检测中断所有函数运行一次的时间
  • 多模态大语言模型arxiv论文略读(二)
  • 基于Edge-TTS的OpenAI兼容文本转语音API实战指南
  • QwQ-32B-GGUF模型部署
  • 快速入手-基于DRF的过滤、分页、查询配置(十五)
  • 2025年渗透测试面试题总结-某 携程旅游-基础安全工程师(题目+回答)
  • 本科生已发14篇SCI论文被指由其教授父亲挂名,重庆大学成立工作组核实
  • A股26家游戏企业去年营收近1900亿元:过半净利下滑,出海成为主流选择
  • 上海一中院一审公开开庭审理被告人胡欣受贿案
  • 以色列计划“占领加沙”,特朗普下周中东行结束之际将是“机会窗口”
  • 马克思主义理论研究教学名师系列访谈|石书臣:思政课是落实立德树人的关键
  • 山东滕州一车辆撞向公交站台撞倒多人,肇事者被控制,案件已移交刑警