11.2.4 聊天记录拉取设计与实现
1 服务端广播消息给客户端
消息格式
发送端:比如用户名:小鸭子米奇,用户id:5发送的消息,此时会携带cookie
{"type": "clientMessages","payload": {"roomId": "beast","messages": [{"content": "这是小鸭子发送的消息"}] }
}
经过服务端处理后转发给其他接收者的消息,此时消息类型type 变为“serverMessages”,message字段增 加了消息id,并增加了用户信息 "user": { "id": 5, "username": "小鸭子米奇"},,以及时间戳timestamp。
{"type": "serverMessages","payload": {"roomId": "beast","messages": [{"id": "1726839290525-0","content": "这是小鸭子发送的消息","user": {"id": 5,"username": "小鸭子米奇"},"timestamp": 1726839290524},sages": [{"id": "1726839290525-1","content": "这是小鸭子发送的消息","user": {"id": 5,"username": "小鸭子米奇"},"timestamp": 1726839290525}]}
}
发送端的json数据只所以不带用户信息,是因为其可以通过cookie从redis读取email,再根据 email去 MySQL查询到username和user id,这里这个设计可以了解,但这种做法虽然减少了客户端发送的数据量
发布订阅框架
存储消息
见函数: int ApiStoreMessage(string room_id, std::vector &msgs)
格式:
// 将 std::vector<Message> 序列化为 JSON 字符串
string SerializeMessageToJson(const Message msg) {Json::Value root; // JSON 根节点Json::StreamWriterBuilder writer; // JSON 写入器writer.settings_["indentation"] = ""; // 禁用缩进和换行// 填充消息字段root["content"] = msg.content;root["timestamp"] = (Json::UInt64) msg.timestamp;root["user_id"] = (Json::Int64) msg.user_id;root["username"] = msg.username;// 将 JSON 树转换为字符串return Json::writeString(writer, root);
}
消息广播逻辑
2 拉取历史消息
消息格式
客户端->服务端格式:
{"type": "requestRoomHistory","payload": {"roomId": "3bb1b0b6-e91c-11ef-ba07-bd8c0260908d","firstMessageId": "1739364544747-0","count": 30}
}
可以自定义firstMessageId从哪里开始获取消息,并设置获取消息的数量 count
服务端->客户端
{"payload" : {"hasMoreMessages" : true,"id" : "3bb1b0b6-e91c-11ef-ba07-bd8c0260908d","messages" : [{"content" : "是否可以","id" : "1739363780419-0","timestamp" : 1739363780419,"user" : {"id" : 9,"username" : "darren老师"}},{"content" : "测试","id" : "1739363133438-0","timestamp" : 1739363133438,"user" : {"id" : 9,"username" : "darren老师"}}],"name" : "音视频课程"},"type" : "serverRoomHistory"
}
如果hasMoreMessages为ture说明还有更多历史消息可以继续获取,如果hasMoreMessages为false,则没 有更多历史消息可以获取。
3 修复初次拉取的消息不能显示用户名
消息获取
之前的消息存储并没有username这个字段,这个版本在存储消息时增加了这个字段。
int ApiGetRoomHistory(Room &room, MessageBatch &message_batch)
{..........//增加if (!root["username"].isNull()) { // 如果用户名不为空,则设置用户名msg.username = root["username"].asString();}else {// LOG_WARN << "username null"; //早期有些消息没有存储用户名}........
}
4 改动步骤
4.1 修改Room和Message结构
文件路径: api\api_types.h 增加Room结构信息,增加Message结构信息
4.2 构建发布订阅模式中心
server/application/chat-room4/service/pub_sub_service.h
server/application/chat-room4/service/pub_sub_service.cc
4.3 加载聊天室+用户订阅聊天室 m
ain.c load_room_list(); 函数的实现
客户端连接成功后,要订阅聊天室
server/application/chat-room4/service/websocket_conn.cc
在void CWebSocketConn::OnRead(Buffer* buf)函数
4.4 聊天功能
解析客户端消息
service\websocket_conn.cc
int CWebSocketConn::handleClientMessages(Json::Value &root) 函数
{"type": "clientMessages","payload": {"roomId": "beast","messages": [{"content": "这是小鸭子发送的消息"}]}
}
根据格式进行解析
存储消息
api\api_msg.h api\api_msg.c
按照上一节课的格式存储消息,但相比上一节课命令演示的增加了username的存储
int ApiStoreMessage(string room_id, std::vector &msgs);
存储后使用命令读取
XREVRANGE key + - COUNT 5
广播消息
service\websocket_conn.cc
int CWebSocketConn::handleClientMessages(Json::Value &root) 函数
根据房间id获取房间成员,然后遍历发送
4.5 拉取历史记录
service\websocket_conn.cc
int CWebSocketConn::handleRequestRoomHistory(Json::Value &root) 函数。
4.6 修复登录初次拉取消息不显示用户名
int CWebSocketConn::sendHelloMessage()
修改
user["id"] = (Json::Int64)message_batch.messages[j].user_id; //这里该获取对应消息的iduser["username"] = message_batch.messages[j].username; //这里通过user map找到对应名字?
4.7 刷新页面不能正常发送消息问题
service\websocket_conn.cc
增加断开处理
void CWebSocketConn::OnRead函数
else if(frame.opcode == 0x08) { // 0x8 是否为关闭帧LOG_INFO<< "Received close frame, closing connection...";disconnect();
}void CWebSocketConn::disconnect() {if (tcp_conn_) {// 发送 WebSocket 关闭帧sendCloseFrame(1000, "Normal closure");tcp_conn_->shutdown();}{std::lock_guard<std::mutex> ulock(s_mtx_user_ws_conn_map_); //自动释放auto existing_conn = s_user_ws_conn_map.find(userid_);if (existing_conn != s_user_ws_conn_map.end()) {LOG_WARN << "s_user_ws_conn_map.erase userid:" << userid_ << "";// 如果已存在连接,先断开旧连接s_user_ws_conn_map.erase(existing_conn);}}
}
参考链接:0voice · GitHub