go游戏后端开发17: node节点搭建
搭建游戏服务节点(Node)与业务逻辑处理
1. 游戏服务节点(Node)的搭建
在搭建游戏服务节点(Node)时,我们需要实现一个工作节点,用于处理实际的游戏逻辑。这个节点与 connect
服务不同,它不涉及 WebSocket 的管理,而是作为 NATS 的客户端,处理游戏逻辑。
我们把这个节点称为 node
,它包含一个应用程序(app
),用于实现具体的业务逻辑。这个 app
与之前实现的 connector
类似,但功能上有所不同。
2. NATS 客户端集成
在 node
中,我们同样需要集成 NATS 客户端,用于接收和发送消息。我们为 NATS 客户端定义了两个通道(channel
):
-
remote_message
:用于接收和发送消息。 -
default
:用于处理默认逻辑或配置选项。
以下是 NATS 客户端的实现代码:
node/remote_client.go
:
package node
import (
"context"
"log"
"github.com/nats-io/nats.go"
)
type NATSClient struct {
nc *nats.Conn
subject string
}
func NewNATSClient(serverURL, subject string) *NATSClient {
return &NATSClient{
subject: subject,
}
}
func (c *NATSClient) Run() error {
var err error
c.nc, err = nats.Connect(serverURL)
if err != nil {
return err
}
_, err = c.nc.Subscribe(c.subject, func(msg *nats.Msg) {
log.Printf("Received message: %s", msg.Data)
// 处理接收到的消息
})
return err
}
func (c *NATSClient) Close() error {
if c.nc != nil {
return c.nc.Close()
}
return nil
}
func (c *NATSClient) SendMessage(message []byte) error {
if c.nc != nil {
return c.nc.Publish(c.subject, message)
}
return nil
}
3. 游戏服务节点(Node)的业务逻辑
在 node
中,我们需要实现具体的业务逻辑。以下是 node
的主程序代码:
node/main.go
:
package main
import (
"context"
"log"
"node"
)
func main() {
// 启动 NATS 客户端
natsClient := node.NewNATSClient("nats://localhost:4222", "node.update_user_address")
if err := natsClient.Run(); err != nil {
log.Fatalf("Failed to start NATS client: %v", err)
}
// 模拟接收消息
message := []byte(`{"cid":"client123","body":{"uid":"user123","location":{"latitude":"34.0522","longitude":"-118.2437","province":"CA","city":"Los Angeles"}},"src":"connect","dst":"node","router":"user_handler.update_user_address","type":"normal"}`)
if err := natsClient.SendMessage(message); err != nil {
log.Fatalf("Failed to send message: %v", err)
}
// 等待一段时间后关闭服务
<-context.Background().Done()
if err := natsClient.Close(); err != nil {
log.Fatalf("Failed to close NATS client: %v", err)
}
}
4. 消息处理与路由
在 node
中,我们需要处理接收到的消息,并根据路由信息调用相应的处理器。以下是消息处理的逻辑:
node/message_handler.go
:
package node
import (
"log"
)
type MessageHandler struct {
NATSClient *NATSClient
}
func NewMessageHandler(natsClient *NATSClient) *MessageHandler {
return &MessageHandler{
NATSClient: natsClient,
}
}
func (h *MessageHandler) HandleMessage(message []byte) error {
// 解析消息
var msg struct {
CID string `json:"cid"`
Body struct {
UID string `json:"uid"`
Location struct {
Latitude string `json:"latitude"`
Longitude string `json:"longitude"`
Province string `json:"province"`
City string `json:"city"`
} `json:"location"`
} `json:"body"`
Src string `json:"src"`
Dst string `json:"dst"`
Router string `json:"router"`
Type string `json:"type"`
}
if err := json.Unmarshal(message, &msg); err != nil {
log.Printf("Failed to unmarshal message: %v", err)
return err
}
log.Printf("Received message: %+v", msg)
// 根据路由调用处理器
switch msg.Router {
case "user_handler.update_user_address":
return h.UpdateUserAddress(msg)
default:
log.Printf("Unknown router: %s", msg.Router)
return nil
}
}
func (h *MessageHandler) UpdateUserAddress(msg struct {
CID string
Body struct {
UID string
Location struct {
Latitude string
Longitude string
Province string
City string
}
}
Src string
Dst string
Router string
Type string
}) error {
// 更新用户地址逻辑
log.Printf("Updating user address for UID: %s", msg.Body.UID)
// 这里可以调用数据库更新用户地址
return nil
}