项目实战5:聊天室
目录
项目运行
项目框架
模块逻辑
业务逻辑
数据存储
代码编写
1 情景在线
2 HTTP请求(http_handler.h)
2.1注册/登录模块(http_conn.c)
2.1.1注册(api_reg.cc)
(1)解析
(2)验证
(3)返回
2.1.2登录(api_login.cc)
(1)解析
(2)验证
(3)返回
2.2聊天模块(websocket_conn.cc)
2.2.1建立websocket连接
(1)握手
(3)发送"hello"信息
2.2.2处理聊天消息
(1)房间管理
(2)消息发送
2.2.3请求历史消息
2.2.4创建聊天室
(1)业务处理
(2)提供数据的api
3 分布式
上线前项目存在的问题
项目运行
1.git clone http://gitlab.0voice.com/2410_vip/github-chatroom.git
2.进入server目录登录MySQL,导入数据库(确保已经安装mysql)
3.安装第三方库:libjsoncpp-dev(用于解析和生成json数据)、libhiredis-dev(服务端需要作为客户端利用Redis提供的hiredis访问redis服务器)、uuid-dev( 项目需要使用UUID 生成功能)
-- 注意hiredis0.14.0与Ubuntu20冲突,需要改为0.14.1的版本
4.安装redis-server:redis的服务端也配置在同一台虚拟机上
5.server端编译运行 项目的后台搭建成功,可以处理前端业务
6.安装Node.js,为web客户服务端提供环境支持
7.npm install安装web客户服务端需要的组件,"npm run dev"运行web客户端。直接在浏览器访问。
项目框架
模块逻辑
同一个端口,如何区分websocket和http服务?
业务逻辑
数据json格式:
数据存储
MySQL: 存储 "用户信息" ,在chatroom数据库中对应的user表
Redis :存储 "房间消息" 和 "用户cookie"
房间消息:使用redis的stream结构,key为消息id,value为具体的聊天消息
其中消息id = 时间戳 + 序列号 序列号用于区分1毫秒内的消息。
用户cookie:使用redis的string结构,key为cookie,value为用户id。为一个临时键值对,默认有效期是1天,超过则会被redis删除,需要重新登陆。
从Redis获取消息举例:
Redis> XREVRANGE mystream (1736936461668-0 - COUNT 5
解析:
1、XREVRANGE : 命令,指逆序(新->旧)读取stream消息
2、mystream :stream是Redis中严格按照时间排序的数据结构,可以看作一个持续更新的记事本,而mystream 就是这个记事本的名字,除此之外可以用 order_stream
存订单,log_stream
存日志
1、1736936461668指从1970.1.1到发送消息时间的毫秒数
2、( 表示不包含1736936461668标识的这条消息,此消息后的5条消息
代码编写
1 情景在线
用户A注册账号,建立http服务,解析json数据,验证信息在数据库不存在且存入redis后,给用户A返回一个cookie,限时一天,用户A可以拿着cookie与服务器发送消息,期间无需再验证密码。登录成功后,用户A给服务器发送握手信息,请求Websocket服务,建立连接。服务端在检验cookie有效后,把用户A的ID和连接建立map映射,并且给其订阅默认的五个聊天室,随后加载五个聊天室的历史数据返回给用户A,尽管没有数据。用户A收到数据,知道连接建立成功。随后用户A在1号聊天室发布一条消息,服务器收到A通过websocket发来的websocket数据帧,按照websocket格式解析出frame,再通过json反序列化获取payload_data,把这个data存入redis数据库后,消息就算是落盘了,再执行一遍相反的包装操作,把这个websocket数据帧发送给其余也订阅了1号聊天室的用户,尽管这时候只有用户A。
用户B注册账号,经历上面的阶段后,会在连接建立成功的时候看到1号聊天室用户A发送的消息,然后在里面也发送一条消息,这个时候经历上面的 decode - store - encode - send的阶段,A就会看到这条消息。
用户B随后创建一个6号聊天室,这个聊天室信息存入磁盘后,会自动把在线的用户A和用户B拉入到房间里,在服务器重启后,由于落盘了,6号聊天室也会变成默认聊天室,所有新登录的用户都会自动订阅这个聊天室,并且会看到历史聊天数据。
2 HTTP请求(http_handler.h)
1.接收到HTTP请求,通过http header选择 websocket或者 http服务
//http_handler.hvoid OnRead(Buffer* buf) {if(request_type_ == UNKNOWN) {const char *in_buf = buf->peek();int32_t len = buf->readableBytes();std::cout << "in_buf: " << in_buf << std::endl;auto headers = parseHttpHeaders(in_buf, len); //解析http头if (isWebSocketRequest(headers)) {// WebSocket 请求request_type_ = WEBSOCKET;http_conn_ = std::make_shared<CWebSocketConn>(tcp_conn_);http_conn_->setHeaders(headers);} else {// HTTP 请求request_type_ = HTTP;http_conn_ = std::make_shared<CHttpConn>(tcp_conn_);http_conn_->setHeaders(headers);}}// 将数据交给具体的处理器if(http_conn_)http_conn_->OnRead(buf);
}
2.1注册/登录模块(http_conn.c)
2.注册/登录是http服务,接下来需要解析HTTP请求的URL的内容,判断是登录还是注册
//http_conn.c
//解析HTTP请求的内容 根据请求的URL判断是登录还是注册void CHttpConn::OnRead(Buffer *buf) // CHttpConn业务层面的OnRead
{const char *in_buf = buf->peek();int32_t len = buf->readableBytes();http_parser_.ParseHttpContent(in_buf, len);if(http_parser_.IsReadAll()) {string url = http_parser_.GetUrlString();string content = http_parser_.GetBodyContentString();LOG_INFO << "url: " << url << ", content: " << content; if (strncmp(url.c_str(), "/api/login", 10) == 0) { // 登录_HandleLoginRequest(url, content);} else if(strncmp(url.c_str(), "/api/create-account", 18) == 0) { // 注册_HandleRegisterRequest(url, content);} else {//处理未匹配的URLchar *resp_content = new char[256];string str_json = "{\"code\": 1}"; uint32_t len_json = str_json.size();//暂时先放这里#define HTTP_RESPONSE_REQ \"HTTP/1.1 404 OK\r\n" \"Connection:close\r\n" \"Content-Length:%d\r\n" \"Content-Type:application/json;charset=utf-8\r\n\r\n%s"snprintf(resp_content, 256, HTTP_RESPONSE_REQ, len_json, str_json.c_str()); tcp_conn_->send(resp_content);}}}
附:下面3.1与3.2都会用到的一段代码,cookie的生成以及存入redis的相关代码:
//api_common.cc
int ApiSetCookie(string email, string &cookie){// 获取redis连接池CacheManager *cache_manager = CacheManager::getInstance();CacheConn *cache_conn = cache_manager->GetCacheConn("token"); //AUTO_REL_CACHECONN(cache_manager, cache_conn);if(!cache_conn) {LOG_ERROR << "get cache conn failed";return -1;}cookie = generateUUID(); //需要唯一性string ret = cache_conn->SetEx(cookie, 86400, email); //存入redis key为cookie value为email 86400s = 24hif(!ret.empty()) {return 0;} else {return -1;}
}
2.1.1注册(api_reg.cc)
(1)解析
解析注册请求post_data中的 JSON 数据
//api_reg.cc
int decodeRegisterJson(const std::string &str_json, string &username,string &email, string &password)
{ bool res;Json::Value root;Json::Reader jsonReader;res = jsonReader.parse(str_json, root); if (!res) {LOG_ERROR << "parse reg json failed ";return -1;}// 用户名if (root["username"].isNull()) {LOG_ERROR << "username null";return -1;}username = root["username"].asString();//邮箱 if (root["email"].isNull()) {LOG_WARN << "email null";} else {email = root["email"].asString();}//密码if (root["password"].isNull()) {LOG_ERROR << "password null";return -1;}password = root["password"].asString();return 0;
}
(2)验证
验证用户信息(如用户名和邮箱是否已存在),并将用户信息(密码加密后)存储到数据库中。
//api_reg.cc
int registerUser(string &username, string &email, string &password, api_error_id &error_id) {int ret = -1;//创建连接池和MySQL连接CDBManager *db_manager = CDBManager::getInstance();CDBConn *db_conn = db_manager->GetDBConn("chatroom_master");AUTO_REL_DBCONN(db_manager, db_conn);if(!db_conn) {LOG_ERROR << "GetDBConn(chatroom_master) failed" ;return 1;}// 先查询 用户名 邮箱是否存在 如果存在就报错string str_sql = FormatString("select id, username, email from users where username='%s' or email='%s' ", username.c_str(), email.c_str());CResultSet *result_set = db_conn->ExecuteQuery(str_sql.c_str());if(result_set && result_set->Next()) {if(result_set->GetString("username")) {if(result_set->GetString("username") == username) {error_id = api_error_id::username_exists;LOG_WARN << "id: " << result_set->GetInt("id") << ", username: " << username << " 已经存在";}}if(result_set->GetString("email")) {if(result_set->GetString("email") == email) {error_id = api_error_id::email_exists;LOG_WARN << "id: " << result_set->GetInt("id") << ", email: " << email << " 已经存在";}}delete result_set;return -1;}//注册账号// 随机数生成盐值string salt = RandomString(16); //随机混淆码MD5 md5(password+salt);string password_hash = md5.toString();LOG_INFO << "salt: " << salt;//插入语句str_sql = "insert into users (`username`,`email`,`password_hash`,`salt`) values(?,?,?,?)";LOG_INFO << "执行: " << str_sql;// 预处理方式写入数据CPrepareStatement *stmt = new CPrepareStatement();if (stmt->Init(db_conn->GetMysql(), str_sql)) {uint32_t index = 0;stmt->SetParam(index++, username);stmt->SetParam(index++, email);stmt->SetParam(index++, password_hash);stmt->SetParam(index++, salt);bool bRet = stmt->ExecuteUpdate(); //真正提交要写入的数据if (bRet) { //提交正常返回 trueret = 0;LOG_INFO << "insert user_id: " << db_conn->GetInsertId() << ", username: " << username ;} else {LOG_ERROR << "insert users failed. " << str_sql;ret = 1;}}delete stmt;return ret;
}
(3)返回
注册成功返回cookie,失败返回已存在。(在http_conn.c中有调用下面的函数)
//api_reg.cc
int ApiRegisterUser(std::string &post_data, std::string &resp_data){string username;string email;string password;int ret = decodeRegisterJson(post_data, username, email, password);if(ret < 0) {// 参数问题的encodeencdoeRegisterJson(api_error_id::bad_request, "请求参数不全", resp_data);return -1;}// 封装registerUser(username email password)api_error_id error_id = api_error_id::bad_request;ret = registerUser(username, email, password, error_id);//返回注册结果if(ret == 0) {// 注册成功 是需要产生cookieApiSetCookie(email, resp_data);} else {// 注册问题的encodeencdoeRegisterJson(error_id, api_error_id_to_string(error_id), resp_data);}//正常注册 返回cookie 在resp_data中//异常 {} EMAIL_EXISTS 也在resp_data中 返回给http_conn.cc中传入的resp_datareturn ret;
}
2.1.2登录(api_login.cc)
(1)解析
解析登录请求post_data中的登录请求数据
//api_login.cc
// / 解析登录信息
int decodeLoginJson(const std::string &str_json, string &email,string &password) {bool res;Json::Value root;Json::Reader jsonReader;res = jsonReader.parse(str_json, root);if (!res) {LOG_ERROR << "parse login json failed ";return -1;}if (root["email"].isNull()) {LOG_ERROR << "email null";return -1;}email = root["email"].asString();//密码if (root["password"].isNull()) {LOG_ERROR << "password null";return -1;}password = root["password"].asString();return 0;
}
(2)验证
验证用户的邮箱和密码是否正确
//api_login.cc
int verifyUserPassword(string &email, string &password) {int ret = -1;// 只能使用emailCDBManager *db_manager = CDBManager::getInstance();CDBConn *db_conn = db_manager->GetDBConn("chatroom_slave");AUTO_REL_DBCONN(db_manager, db_conn); //析构时自动归还连接if(!db_conn) {LOG_ERROR << "get db conn failed";return -1;}//根据email查询密码string strSql = FormatString("select username, password_hash, salt from users where email='%s'", email.c_str());LOG_INFO << "执行:" << strSql;CResultSet *result_set = db_conn->ExecuteQuery(strSql.c_str());if (result_set && result_set->Next()) { //如果存在则读取密码 string username = result_set->GetString("username");string db_password_hash = result_set->GetString("password_hash");string salt = result_set->GetString("salt");MD5 md5(password + salt); // 计算出新的密码string client_password_hash = md5.toString(); // 计算出新的密码if(db_password_hash == client_password_hash) {LOG_INFO << "username: " << username << " verify ok";ret = 0;} else {LOG_INFO << "username: " << username << " verify failed";ret = -1;}} else {ret = -1;}if(result_set)delete result_set;return ret;
}
(3)返回
登录成功返回cookie,失败返回错误信息。
//api_login.cc
int ApiUserLogin(std::string &post_data, std::string &resp_data){//string email;string password;// json反序列化// 解析jsonif (decodeLoginJson(post_data, email, password) < 0) {LOG_ERROR << "decodeRegisterJson failed";encodeLoginJson(api_error_id::bad_request, "email or password no fill", resp_data);return -1;}//校验邮箱 密码int ret = verifyUserPassword(email, password);if(ret == 0) {// 设置cookieApiSetCookie(email, resp_data);} else {encodeLoginJson(api_error_id::bad_request, "email password no match", resp_data);}return ret;
}
2.2聊天模块(websocket_conn.cc)
客户端对服务器的主动发送消息,有可能是 "握手信号" ,有可能非握手信号。非握手信号有:"具体消息" 和 "历史消息的查看"
//websocket_conn.cc
void CWebSocketConn::OnRead(客户端发送来的Websocket数据帧)
{if(连接还没建立){-> 2.2.1建立websocket连接}else{先解析Websocket数据帧,拿到其中的payload json字段if(json中type字段 == "clientMessages")-> 2.2.2处理用户消息else if(json中type字段 == "requestRoomHistory")-> 2.2.3请求历史消息else if(json中type字段 == "clientCreateRoom")-> 2.2.4创建聊天室}
}
2.2.1建立websocket连接
(1)握手
获取Websocket数据帧中Sec-WebSocket-Key
字段的值,服务器基于它生成一个 Sec-WebSocket-Accept
值,返回给客户端以完成握手。
//websocket_conn.cc
void CWebSocketConn::OnRead(Buffer* buf){if (!handshake_completed_) {// 客户端 服务端 握手string request = buf->retrieveAllAsString();LOG_INFO << "request" << request;size_t key_start = request.find("Sec-WebSocket-Key: ");if(key_start != std::string::npos) {//说明能找到Sec-WebSocket-Key: key_start += 19; //19是 Sec-WebSocket-Key: 的长度size_t key_end = request.find("\r\n", key_start); // key_start -- key_end = Y90uUZxPYAVrEFJrtYEfCg==std::string key = request.substr(key_start, key_end - key_start);// base 64编码(SHA1(key + 固定str))string response = generateWebSocketHandshakeResponse(key); send(response); //握手成功handshake_completed_ = true;LOG_INFO << "handshake_completed_ ok";
}...cookie检验过程}
(2)检验cookie
//websocket_conn.ccvoid CWebSocketConn::OnRead(Buffer* buf){...握手过程string Cookie = headers_["Cookie"];LOG_INFO << "Cookie: " << Cookie;string sid;if(!Cookie.empty()) {sid = extractSid(Cookie); }LOG_INFO << "sid: " << sid;// 根据cookie-sid ,获取 用户名 用户id Emailstring email;if(Cookie.empty() || ApiGetUserInfoByCookie(username_, userid_, email, sid) < 0) {string reason;if(email.empty()) {reason = "cookie validation failed";} else {reason = "db may be has issue";}// 校验失败sendCloseFrame(1008, reason);} else {//校验成功// 把连接加入 s_user_ws_conn_map 统一管理websocket连接LOG_INFO << "cookie validation ok";s_mtx_user_ws_conn_map_.lock();s_user_ws_conn_map.insert({userid_,shared_from_this()}); // 同样userid连接可能已经存在了s_mtx_user_ws_conn_map_.unlock();//获取用户的聊天室列表std::vector<Room> &room_list = GetRoomList(); for(int i = 0; i < room_list.size(); i++) {rooms_map_.insert({room_list[i].room_id, room_list[i]});}// 发送"hello"给 客户端 sendHelloMessage();}}
(3)发送"hello"信息
初始化客户端的界面,展示用户信息和聊天室的历史消息。
数据流:从Redis获取消息,json序列化为payload,再封装成websocket格式的frame,一个WebSocket 数据帧,发送。
//websocket_conn.cc
//获取用户信息和用户加入的聊天室的历史消息存入结构体,根据结构体字段值,构造json发送给客户端
int CWebSocketConn::sendHelloMessage()
{Json::Value root;root["type"] = "hello";Json::Value payload;Json::Value me;me["id"] = userid_;me["username"] = username_;payload["me"] = me;Json::Value rooms;int it_index = 0;//每个用户都有自己的rooms_map_// 遍历加入的房间,获取每个房间的历史消息for (auto it = rooms_map_.begin(); it != rooms_map_.end(); ++it){Room &room_item = it->second;string last_message_id; MessageBatch message_batch;
到Redis获取房间的消息,存入message_batch结构中ApiGetRoomHistory(room_item, message_batch); LOG_INFO << "room: " << room_item.room_name << ", history_last_message_id:" << it->second.history_last_message_id;Json::Value room; room["id"] = room_item.room_id; //聊天室主题名称room["name"] = room_item.room_name; //聊天室主题名称 先设置成一样的room["hasMoreMessages"] = message_batch.has_more;Json::Value messages; for(int j = 0; j < message_batch.messages.size(); j++) {Json::Value message;Json::Value user;message["id"] = message_batch.messages[j].id;message["content"] = message_batch.messages[j].content; user["id"] = userid_;user["username"] = username_;message["user"] = user;message["timestamp"] = (Json::UInt64)message_batch.messages[j].timestamp;messages[j] = message;}if(message_batch.messages.size() > 0)room["messages"] = messages;else room["messages"] = Json::arrayValue; //不能为NULL,否则前端报异常rooms[it_index] = room;it_index++;}payload["rooms"] = rooms;root["payload"] = payload;Json::FastWriter writer;string str_json = writer.write(root);// 打印 JSON 字符串LOG_INFO << "Serialized JSON: " << str_json;string hello = buildWebSocketFrame(str_json);send(hello);
}
2.2.2处理聊天消息
(1)房间管理
实现一个PubSubService类,全局房间订阅服务,提供获取所有房间ID,增加/删除房间,加入/退出房间,在房间内发布消息 六个接口。
//pub_sub_service.h
//单个房间信息
class RoomTopic
{
public:RoomTopic(const string &room_id, const string &room_topic, uint32_t creator_id) {room_id_ = room_id;room_topic_ = room_topic;creator_id_ = creator_id;}~RoomTopic() {user_ids_.clear();}void AddSubscriber(uint32_t userid) {user_ids_.insert(userid);}void DeleteSubscriber(uint32_t userid) {user_ids_.erase(userid);}std::unordered_set<uint32_t> &getSubscribers() {return user_ids_;}private:string room_id_;string room_topic_;int creator_id_;std::unordered_set<uint32_t> user_ids_;
};using RoomTopicPtr = std::shared_ptr<RoomTopic>;
using PubSubCallback = std::function<void(const std::unordered_set<uint32_t> user_ids)>;//全局的发布订阅服务
class PubSubService
{
public://单例模式static PubSubService &GetInstance() {static PubSubService instance;return instance;}PubSubService() {}~PubSubService(){}bool AddRoomTopic(const string &room_id, const string &room_topic, int creator_id) {std::lock_guard<std::mutex> lck(room_topic_map_mutex_);if (room_topic_map_.find(room_id) != room_topic_map_.end()) {return false;}RoomTopicPtr room_topic_ptr = std::make_shared<RoomTopic>(room_id, room_topic, creator_id);room_topic_map_[room_id] = room_topic_ptr;return true;}void DeleteRoomTopic(const string &room_id) {std::lock_guard<std::mutex> lck(room_topic_map_mutex_);if (room_topic_map_.find(room_id) != room_topic_map_.end()) {return;}room_topic_map_.erase(room_id);}在websocket_conn.cc里,建立连接成功后,不仅会把userid加入s_user_ws_conn_map,还会用下面的函数订阅GetRoomList()里面默认存在的五个房间bool AddSubscriber(const string &room_id, uint32_t userid) {std::lock_guard<std::mutex> lck(room_topic_map_mutex_);if (room_topic_map_.find(room_id) == room_topic_map_.end()) {return false;}room_topic_map_[room_id]->AddSubscriber(userid);return true;}void DeleteSubscriber(const string &room_id, uint32_t userid) {std::lock_guard<std::mutex> lck(room_topic_map_mutex_);if (room_topic_map_.find(room_id) == room_topic_map_.end()) {return;}room_topic_map_[room_id]->DeleteSubscriber(userid);}void PublishMessage(const string &room_id, PubSubCallback callback) {std::unordered_set<uint32_t> user_ids;{std::lock_guard<std::mutex> lck(room_topic_map_mutex_);if (room_topic_map_.find(room_id) == room_topic_map_.end()) {return;}user_ids = room_topic_map_[room_id]->getSubscribers(); //获取当前房间的所有订阅者}callback(user_ids); //发送给所有订阅者}static std::vector<Room> &GetRoomList(); //获取当前的房间列表
private:std::unordered_map<string, RoomTopicPtr> room_topic_map_;std::mutex room_topic_map_mutex_;
};
(2)消息发送
客户端发送消息到服务端,服务端要把消息解析出来(按照websocket格式解析出frame,再通过json反序列化获取payload_data),写到redis里面,并且重新封装消息后,通过PublishMessage发送到房间里。具体的流程是通过回调函数,将消息发给每一个订阅了 房间的用户。
//websocket_conn.cc
int CWebSocketConn::handleClientMessages(Json::Value &root)
{// Json反序列化把json消息解析出来string room_id;if(root["payload"].isNull()) {return -1;}Json::Value payload = root["payload"];if(payload["roomId"].isNull()) {return -1;}room_id = payload["roomId"].asString();if(payload["messages"].isNull()) {return -1;}Json::Value arrayObj = payload["messages"];if(arrayObj.isNull()) {return -1;}std::vector<Message> msgs; //消息具体内容 和相关发送信息uint64_t timestamp = getCurrentTimestamp();for(int i = 0; i < arrayObj.size(); i++) {Json::Value message = arrayObj[i];Message msg;msg.content = message["content"].asString();msg.timestamp = timestamp;msg.user_id = userid_;msg.username = username_;msgs.push_back(msg);}//持久化 写到redis里面ApiStoreMessage(room_id, msgs);//重新组织一个jsonroot = Json::Value();payload = Json::Value();root["type"] = "serverMessages";payload["roomId"] = room_id;Json::Value messages;for(int j = 0; j < msgs.size(); j++) {Json::Value message;Json::Value user;message["id"] = msgs[j].id;message["content"] = msgs[j].content; user["id"] = userid_;user["username"] = username_;message["user"] = user;message["timestamp"] = (Json::UInt64)msgs[j].timestamp;messages[j] = message;}if(msgs.size() > 0)payload["messages"] = messages;elsepayload["messages"] = Json::arrayValue;root["payload"] = payload;Json::FastWriter writer;string str_json = writer.write(root);//json包装成Websocket帧std::string response = buildWebSocketFrame(str_json);//回调函数具体执行 将消息发送给房间的所有订阅者auto callback = [&response, &room_id, this](const std::unordered_set<uint32_t> user_ids) {for (uint32_t userId: user_ids){CHttpConnPtr ws_conn_ptr = nullptr;{std::lock_guard<std::mutex> ulock(s_mtx_user_ws_conn_map_); //自动释放ws_conn_ptr = s_user_ws_conn_map[userId];//获取每个用户ID对应的连接}if(ws_conn_ptr) {ws_conn_ptr->send(response);
//如果找到用户的 WebSocket 连接,则调用 send 方法,将消息发送给该用户。} else{LOG_WARN << "can't find userid: " << userId;}/* code */}};// 广播给所有人 走回调函数PubSubService::GetInstance().PublishMessage(room_id, callback);return 0;
}
2.2.3请求历史消息
返回客户端要求的历史消息条数,count是从客户端发送的 JSON 数据中提取的,如果房间中剩余的消息少于 count,则返回所有剩余的消息。如果房间中剩余的消息多于 count,则只返回 count 条消息。客户端也可以通过动态设置count的值实现分页加载。
//websocket_conn.cc
//如果hasMoreMessages为ture说明还有更多历史消息可以继续获取,如果hasMoreMessages为false,则没有更多历史消息可以获取。
int CWebSocketConn::handleRequestRoomHistory(Json::Value &root) {std::string roomId = root["payload"]["roomId"].asString();std::string firstMessageId = root["payload"]["firstMessageId"].asString();int count = root["payload"]["count"].asInt();// 从数据库获取历史消息string last_message_id;MessageBatch message_batch;Room &room_item = rooms_map_[roomId];room_item.history_last_message_id = firstMessageId;// 获取房间的消息ApiGetRoomHistory(room_item, message_batch, count); root = Json::Value(); //重新置空// 构建响应root["type"] = "serverRoomHistory";Json::Value payload;payload["roomId"] = room_item.room_id; //聊天室idpayload["name"] = room_item.room_name; //聊天室名称payload["hasMoreMessages"] = message_batch.has_more; //是否还有更多消息// 构建消息数组Json::Value messages(Json::arrayValue);for (const auto& msg : message_batch.messages) {Json::Value message;Json::Value user;message["id"] = msg.id;message["content"] = msg.content;user["id"] = (Json::Int64)msg.user_id;user["username"] = msg.username;message["user"] = user;message["timestamp"] = (Json::UInt64)msg.timestamp;messages.append(message);}if(message_batch.messages.size() > 0)payload["messages"] = messages;else payload["messages"] = Json::arrayValue; //不能为NULL,否则前端报异常root["payload"] = payload;std::string response = buildWebSocketFrame(root.toStyledString());send(response);return 0;
}
2.2.4创建聊天室
用户可以创建聊天室,所以我们需要把原先写死的room_list,改成一个数据库中的数据表,在数据库中存储起来,所以对应的也就会有很多聊天室相关的字段:room_id(UUID生成),room_name,room_creater。之后在服务器重启的时候,就会从mysql中读出所有的room_list,重新创建房间。
当用户创建聊天室,服务端收到相关的websocket请求,把json解析出来后,分配房间id,通过ApiCreateRoom()写入到mysql数据库当中(不是redis),再通过PubSubService类的两个函数把新建的聊天室加入到 room_list(负责存储房间信息,给用户展示)和room_topic_map(负责管理订阅关系,发布消息),随后让每一个用户都强制订阅这个聊天室,并且给他们send有新聊天室创建的消息。
(1)业务处理
//websocket_conn.cc
//请求格式: {"type":"clientCreateRoom","payload":{"roomName":"dpdk教程"}}
//响应格式: {"type":"serverCreateRoom","payload":{"roomId":"3bb1b0b6-e91c-11ef-ba07-bd8c0260908d", "roomName":"dpdk教程"}}
int CWebSocketConn::handleClientCreateRoom(Json::Value &root)
{LOG_INFO << "handleClientCreateRoom into";// 把消息解析出来string roomId;string roomName;Json::Value payload = root["payload"];if(payload.isNull()) {LOG_WARN << "payload is null";return -1;}// 解析json 解析聊天室的名字 if(payload["roomName"].isNull()) {LOG_WARN << "roomName is null";return -1;}roomName = payload["roomName"].asString();// 分配房间idroomId = generateUUID();LOG_INFO << "handleClientCreateRoom, roomName: " << roomName << ", roomId: " << roomId;//存储到数据库std::string error_msg;bool ret = ApiCreateRoom(roomId, roomName, userid_, error_msg);if(!ret ) {LOG_ERROR << "ApiCreateRoom failed: " << error_msg;return -1;}PubSubService::GetInstance().AddRoomTopic(roomId, roomName, userid_);// 把新建的聊天室加入到 room_listRoom room;room.room_id = roomId;room.room_name = roomName;room.create_time = getCurrentTimestamp();room.creator_id = userid_;PubSubService::AddRoom(room);//每个人都强制订阅这个聊天室{std::lock_guard<std::mutex> lock(s_mtx_user_ws_conn_map_);rooms_map_.insert({roomId, room});for(auto it = s_user_ws_conn_map.begin(); it != s_user_ws_conn_map.end(); ++it) {//房间id, 用户id 订阅LOG_INFO << "AddSubscriber: " << roomId << ", userid: " << it->first;PubSubService::GetInstance().AddSubscriber(roomId, it->first);}}//发送消息给所有人,告诉有新的聊天室创建了//先序列化消息// Json::Value root;root = Json::Value(); //重新置空// Json::Value payload;payload = Json::Value(); //重新置空root["type"] = "serverCreateRoom";payload["roomId"] = roomId;payload["roomName"] = roomName;root["payload"] = payload;//json序列化Json::FastWriter writer;string json_str = writer.write(root);LOG_INFO << "serverCreateRoom: " << json_str;string response = buildWebSocketFrame(json_str);//发送 创建聊天室的 roomId 和 roomName,通知所有订阅者有新的聊天室可用auto callback = [&response, &roomId, this](const std::unordered_set<uint32_t> &user_ids) {LOG_INFO << "room_id:" << roomId << ", callback " << ", user_ids.size(): " << user_ids.size();for (uint32_t userId: user_ids) {CHttpConnPtr ws_conn_ptr = nullptr;{std::lock_guard<std::mutex> ulock(s_mtx_user_ws_conn_map_); //自动释放ws_conn_ptr = s_user_ws_conn_map[userId];}if(ws_conn_ptr) {ws_conn_ptr->send(response);} else {LOG_WARN << "can't find userid: " << userId;}}};PubSubService::GetInstance().PublishMessage(roomId, callback);return 0;
}
(2)提供数据的api
//api_room.cc
#include "api_room.h"
#include <sstream>
bool ApiCreateRoom(const std::string& room_id, const std::string& room_name, int creator_id, std::string& error_msg)
{CDBManager* db_manager = CDBManager::getInstance();CDBConn* db_conn = db_manager->GetDBConn("chatroom_master");if (!db_conn) {error_msg = "无法获取数据库连接";return false;}AUTO_REL_DBCONN(db_manager, db_conn);std::stringstream ss;ss << "INSERT INTO room_info (room_id, room_name, creator_id) VALUES ('"<< room_id << "', '"<< room_name << "', "<< creator_id << ")";if (!db_conn->ExecuteUpdate(ss.str().c_str(), true)) {error_msg = "创建聊天室失败";return false;}return true;
}bool ApiGetRoomInfo(const std::string& room_id, std::string& room_name, int& creator_id,std::string& create_time,std::string& update_time,std::string& error_msg){CDBManager* db_manager = CDBManager::getInstance();CDBConn* db_conn = db_manager->GetDBConn("chatroom_slave");if (!db_conn) {error_msg = "无法获取数据库连接";return false;}AUTO_REL_DBCONN(db_manager, db_conn);std::stringstream ss;ss << "SELECT room_name, creator_id, create_time, update_time "<< "FROM room_info WHERE room_id='" << room_id << "'";CResultSet* result_set = db_conn->ExecuteQuery(ss.str().c_str());if (!result_set) {error_msg = "查询聊天室信息失败";return false;}if (result_set->Next()) {room_name = result_set->GetString("room_name");creator_id = result_set->GetInt("creator_id");create_time = result_set->GetString("create_time");update_time = result_set->GetString("update_time");delete result_set;return true;}delete result_set;error_msg = "聊天室不存在";return false;} bool ApiGetAllRooms(std::vector<Room>& rooms, std::string& error_msg,const std::string& order_by)
{ CDBManager* db_manager = CDBManager::getInstance();CDBConn* db_conn = db_manager->GetDBConn("chatroom_slave");if (!db_conn) {error_msg = "无法获取数据库连接";return false;}AUTO_REL_DBCONN(db_manager, db_conn);std::stringstream ss;ss << "SELECT room_id, room_name, creator_id, create_time, update_time "<< "FROM room_info ORDER BY " << order_by;CResultSet* result_set = db_conn->ExecuteQuery(ss.str().c_str());if (!result_set) {error_msg = "查询聊天室列表失败";return false;}while (result_set->Next()) {Room room;room.room_id = result_set->GetString("room_id");room.room_name = result_set->GetString("room_name");room.creator_id = result_set->GetInt("creator_id");room.create_time = result_set->GetString("create_time");room.update_time = result_set->GetString("update_time");rooms.push_back(room);}delete result_set;return true;
}
3 分布式
如果有三台服务器分别部署在北京、上海、深圳,然后这三个地方分别有用户A、B、C订阅了1号聊天室,如果A在里面发消息,如何让不同服务器的B和C看见呢?如果后端需要发布公告在1号聊天室,如何让三台不同服务器上的用户看见呢?
首先所有服务器上有聊天室注册上线的时候,都应该在一个"注册服务发现中心"注册自己的信息,比如来自于哪一台服务器,由 "job"服务从注册服务发现中心里面拉取chatroom列表,发起连接,由"grpc"推送消息到chatroom,然后再推送到web客户端。
而如果是后端消息公告的话,会把消息推送到"logic"模块,然后再push发送到Kafka消息队列中,再由job服务从Kafka队列获取数据。其中logic和job服务都可以是多个并发的。
各模块作用:
logic:业务逻辑核心,用户登录注册管理,消息管理。
Kafka:缓存消息
job:指代一个任务线程
grpc:远程过程调用。比如可以调用比如在北京的服务器上编写好的函数,进而让job完成工作
注册服务发现中心:
(1)告知job,chatroom grpc远程服务器的地址。由chatroomID和服务器IP对应。
(2)告知chatroom,可用的logic grpc远程服务器的地址。例如,用户发送一条消息,chatroom
服务需要把消息交给 logic
服务进行处理和广播
nginx:可以在chatroom和Web客户端之间加上nginx反向代理,实现负载均衡,给Web客户端返回负载最低的服务器。
上线前项目存在的问题
出现的问题1:
1.为什么客户端一发消息或者发起创建房间请求,服务端就段错误
出现的问题2:
2.为什么我的/server/bin文件夹下面没有logic和job可执行文件