微服务即时通讯系统(服务端)——消息转发微服务设计与实现详解(5)
文章目录
- 微服务即时通讯系统(服务端)——消息转发微服务设计与实现详解(5)
- 项目概述
- 核心架构设计
- 1. 服务分层架构
- 2. 核心组件说明
- 2.1 服务实现类 `MsgTransmitServiceImpl`
- 2.2 数据访问层 `ChatSessionMemberTable`
- 2.3 服务构建器 `MsgTransmitServerBuilder`
- 3. 依赖服务集成
- 3.1 服务发现与注册
- 3.2 数据库配置
- 3.3 消息队列配置
- 消息流转流程
- 1. 接收消息请求
- 2. 处理流程
- 3. 数据流向
- 关键技术实现
- 1. 错误处理机制
- 2. 数据库事务管理
- 3. 服务治理
- 服务端详细设计
- 1.1 服务端入口 (main.cpp)
- 1.2 建造者模式详解
- 1.3 核心服务实现
- RPC服务实现类
- 业务逻辑流程
- 1.4 数据访问层
- 数据库表映射
- 数据访问封装
- 测试端详细设计
- 2.1 测试端入口 (测试代码)
- 2.2 测试用例设计
- 2.3 测试策略分析
- 测试覆盖点:
- 测试数据设计:
- 三、配置管理系统
- 3.1 服务端配置
- 3.2 测试端配置
- 3.3 构建流程
- 部署与运行
- 4.1 服务端启动
- 4.2 测试端执行
- CMakeLists.txt
- CMake构建配置
- 性能优化考虑
- 扩展性设计
微服务即时通讯系统(服务端)——消息转发微服务设计与实现详解(5)
项目概述
基于C++的分布式即时通讯系统中的消息转发微服务,负责处理用户消息的转发逻辑。该服务使用brpc框架构建RPC服务,通过etcd实现服务注册与发现,MySQL存储聊天会话成员信息,RabbitMQ进行消息发布。
gittee完整项目代码
核心架构设计
1. 服务分层架构
┌─────────────────────────────────────────┐
│ 消息转发微服务 │
├─────────────────────────────────────────┤
│ MsgTransmitServer (服务入口) │
├─────────────────────────────────────────┤
│ MsgTransmitServiceImpl (业务逻辑层) │
├─────────────────────────────────────────┤
│ ChatSessionMemberTable (数据访问层) │
├─────────────────────────────────────────┤
│ brpc + etcd + MySQL + RabbitMQ │
└─────────────────────────────────────────┘
2. 核心组件说明
2.1 服务实现类 MsgTransmitServiceImpl
这是业务逻辑的核心实现,继承自protobuf生成的RPC服务接口。
主要职责:
- 处理消息转发请求
- 调用用户服务获取发送者信息
- 查询会话成员列表
- 发布消息到消息队列
关键方法 GetTransmitTarget 流程:
void GetTransmitTarget(::google::protobuf::RpcController* controller,const ::bite_im::NewMessageReq* request,::bite_im::GetTransmitTargetRsp* response,::google::protobuf::Closure* done) override
{// 1. 请求参数解析std::string rid = request->request_id();std::string uid = request->user_id();std::string ssid = request->session_id();std::string chat_ssid = request->chat_session_id();// 2. 调用用户服务获取发送者信息bite_im::UserService_Stub user_stub(ptr.get());user_stub.GetUserInfo(&ctl, &user_req, &user_rsp, nullptr);// 3. 构建响应消息bite_im::MessageInfo* msg = response->mutable_message();msg->set_timestamp(time(nullptr));msg->set_chat_session_id(chat_ssid);msg->set_message_id(bite_im::uuid());msg->mutable_sender()->CopyFrom(sender);// 4. 查询会话成员std::vector<ChatSessionMember> users = mysql_csmember_->members(chat_ssid);for(auto& member : users)response->add_target_id_list(member.user_id());// 5. 发布到消息队列rmq_client_->Publisher(exchange_name_, routing_key_, msg->SerializeAsString());// 6. 返回成功响应response->set_success(true);
}
2.2 数据访问层 ChatSessionMemberTable
封装了对聊天会话成员表的数据库操作。
namespace bite_im
{typedef odb::query<bite_im::ChatSessionMember> ChatSessionMember_query;typedef odb::result<bite_im::ChatSessionMember> ChatSessionMember_result;class ChatSessionMemberTable{public:ChatSessionMemberTable(std::shared_ptr<odb::core::database>& db): db_(db){if(!db_check_nullptr()){LOG_ERROR("数据库未完成初始化!");return;}}bool db_check_nullptr(){return db_ != nullptr;}bool execute(std::vector<std::string>& sqls){odb::transaction t(db_->begin());try{ for(auto &sql : sqls){db_->execute(sql);}LOG_INFO("执行sql语句成功!");t.commit();return true;}catch (const std::exception& e){t.rollback();LOG_ERROR("ChatSessionMemberTable 插入失败-原因:{}", e.what());}return false;}//增加单个成员bool append(ChatSessionMember& member){odb::transaction t(db_->begin());try{db_->persist(member);t.commit();return true;}catch (const std::exception& e){t.rollback();LOG_ERROR("ChatSessionMember 添加单个成员触发异常, 会话id:{}, 异常原因:{}", member.session_id(), e.what());}return false;}//批量增加成员bool append(std::vector<ChatSessionMember>& members){odb::transaction t(db_->begin());try{for(auto& member : members)db_->persist(member);t.commit();return true;}catch (const std::exception& e){t.rollback();LOG_ERROR("ChatSessionMember 批量添加成员触发异常, 异常原因:{}", e.what());}return false;}//移除单个成员bool remove(ChatSessionMember& member){odb::transaction t(db_->begin());try{db_->erase_query<ChatSessionMember>(ChatSessionMember_query::session_id == member.session_id()&& ChatSessionMember_query::user_id == member.user_id());t.commit();return true;}catch (const std::exception& e){t.rollback();LOG_ERROR("ChatSessionMember 移除单个成员触发异常, 会话id:{}, 异常原因:{}", member.session_id(), e.what());}return false;}//移除会话所有成员bool remove(const std::string& ssid){odb::transaction t(db_->begin());try{db_->erase_query<ChatSessionMember>(ChatSessionMember_query::session_id == ssid);t.commit();return true;}catch (const std::exception& e){t.rollback();LOG_ERROR("ChatSessionMember 移除会话成员触发异常, 会话id:{}, 异常原因:{}", ssid, e.what());}return false;}//获取会话所有成员std::vector<ChatSessionMember> members(const std::string& ssid){odb::transaction t(db_->begin());try{std::vector<ChatSessionMember> ret;ChatSessionMember_result result(db_->query<ChatSessionMember>(ChatSessionMember_query::session_id == ssid));for(auto& m : result){ret.push_back(m);}t.commit();return ret;}catch (const std::exception& e){t.rollback();LOG_ERROR("ChatSessionMember 移除会话成员触发异常, 会话id:{}, 异常原因:{}", ssid, e.what());}return {};}~ChatSessionMemberTable(){}private:std::shared_ptr<odb::core::database> db_;};
};
支持的操作:
append()- 添加单个或多个成员remove()- 移除成员members()- 获取会话所有成员
通过ODB构建表
namespace bite_im
{#pragma db object table("chat_session_member")class ChatSessionMember{friend class odb::access;public:ChatSessionMember(){}ChatSessionMember(const std::string& ssid, const std::string& uid): session_id_(ssid), user_id_(uid){}//会话id的获取与使用std::string session_id(){return session_id_;}void session_id(const std::string& ssid){session_id_ = ssid;}//用户id的获取与使用std::string user_id(){return user_id_;}void user_id(const std::string& uid){user_id_ = uid;}~ChatSessionMember(){}private:#pragma db id autounsigned long id_; //自增主键#pragma db type("varchar(64)") indexstd::string session_id_; //会话id#pragma db type("varchar(64)")std::string user_id_; //用户id};
};
数据库表结构:
CREATE TABLE chat_session_member (id BIGINT AUTO_INCREMENT PRIMARY KEY,session_id VARCHAR(64) NOT NULL,user_id VARCHAR(64) NOT NULL
);
2.3 服务构建器 MsgTransmitServerBuilder
采用建造者模式,负责服务的组装和初始化。
构建步骤:
- 创建服务管理器
- 初始化MySQL连接
- 设置服务发现
- 配置RabbitMQ
- 启动RPC服务
- 注册到etcd
class MsgTransmitServerBuilder
{public:MsgTransmitServerBuilder() = default;void make_register_object(const std::string& reghost, const std::string &service_name, const std::string &access_host){try{register_client_ = std::make_shared<Etcd_Tool::RegisterEtcd>(reghost);Etcd_Tool::resinfo_t resinfo;resinfo.service_name_ = service_name;resinfo.service_addr_ = access_host;// 检查注册结果if (!register_client_->Register(resinfo)){LOG_ERROR("Etcd服务注册失败");std::abort();}}catch (const std::exception& e){LOG_ERROR("make_register_object异常-{}", e.what());std::abort();}}void make_discovery_object(const std::string& reghost, const std::string &service_name){try{discovery_client_ = std::make_shared<Etcd_Tool::MonitorEtcd>(reghost);Etcd_Tool::monitor_t moninfo;moninfo.monitor_path_ = service_name;moninfo.type_ = Etcd_Tool::DIR;moninfo.put_on_ = std::bind(&brpcChannelTool::ServiceManager::Online, service_manager_.get(), std::placeholders::_1, std::placeholders::_2);moninfo.put_off_ = std::bind(&brpcChannelTool::ServiceManager::Offline, service_manager_.get(), std::placeholders::_1, std::placeholders::_2);// 检查注册结果if (!discovery_client_->PushMonitor(moninfo)){LOG_ERROR("Etcd服务发现-{}失败", service_name);std::abort();}}catch (const std::exception& e){LOG_ERROR("make_discovery_object异常-{}失败", e.what());abort();}}void make_channel_manager_object(const std::string &service_name){service_manager_ = std::make_shared<brpcChannelTool::ServiceManager>(service_name);user_service_name_ = service_name;}void make_mysql_object(const std::string& dbuser, const std::string& dbpassword, const std::string& dbname, const std::string& dbhost, int32_t dbport, const std::string& charset = "utf8mb4", int32_t max_con_num = 20){try{mysql_client_ = bite_im::ODBFactory().create(dbuser,dbpassword,dbname,dbhost,dbport,"", // socketcharset, // charset0, // client_flagsmax_con_num, // max_con_num5 // init_con_num);}catch (const std::exception& e){LOG_ERROR("make_mysql_object失败, 原因:{}", e.what());std::abort();}}void make_rmq_object(const std::string& user, const std::string& password, const std::string& host, const int32_t& port, const std::string& exchange_name, const std::string& queue_name, const std::string& routing_key){try{rmq_client_ = std::make_shared<RMQTool::RMQClient>(user, password, host, port);exchange_name_ = exchange_name;queue_name_ = queue_name;routing_key_ = routing_key;rmq_client_->DeclareComponents(exchange_name, queue_name, routing_key);}catch (const std::exception& e){LOG_ERROR("make_rmq_object失败, 原因:{}", e.what());std::abort();}}void make_rpc_object(in_port_t port, int timeout_sec, size_t threadnums){if(!mysql_client_){LOG_WARN("mysql client没有初始化!");abort();}if(!rmq_client_){LOG_WARN("rmq client没有初始化!");abort();}if(!discovery_client_){LOG_WARN("discovery_client 用户子服务监视 没有初始化!");abort();}if(exchange_name_.empty()){LOG_WARN("交换机 没有初始化!");abort();}if(queue_name_.empty()){LOG_WARN("队列 没有初始化!");abort();}if(routing_key_.empty()){LOG_WARN("路由密钥 没有初始化!");abort();}try{brpcServer_ = std::make_shared<brpc::Server>();brpc::ServerOptions op;op.idle_timeout_sec = timeout_sec;op.num_threads = threadnums;// 使用配置的存储路径MsgTransmitServiceImpl *msg_service = new MsgTransmitServiceImpl(mysql_client_, service_manager_, user_service_name_, rmq_client_, exchange_name_,queue_name_, routing_key_);if (brpcServer_->AddService(msg_service, brpc::ServiceOwnership::SERVER_OWNS_SERVICE) != 0) {LOG_ERROR("添加Rpc服务失败!");delete msg_service; // 避免泄漏std::abort();}if (brpcServer_->Start(port, &op) != 0) {LOG_ERROR("启动服务失败!");std::abort();}}catch (const std::exception& e){LOG_ERROR("make_rpc_object失败!-{}", e.what());}}std::shared_ptr<MsgTransmitServer> build(){if(!register_client_){LOG_ERROR("Etcd注册器未初始化!");std::abort();}if(!brpcServer_){LOG_ERROR("brpc服务未初始化!");std::abort();}return std::make_shared<MsgTransmitServer>(register_client_, discovery_client_, brpcServer_, service_manager_, mysql_client_);}private:std::shared_ptr<Etcd_Tool::RegisterEtcd> register_client_;std::shared_ptr<Etcd_Tool::MonitorEtcd> discovery_client_;std::shared_ptr<brpc::Server> brpcServer_;//服务管理器,用于调用用户微服务获取用户数据std::shared_ptr<brpcChannelTool::ServiceManager> service_manager_;//mysqlstd::shared_ptr<odb::core::database> mysql_client_;//用户微服务名std::string user_service_name_;//RabbitMQ进行消息发布std::shared_ptr<RMQTool::RMQClient> rmq_client_;//交换机和路由密钥std::string exchange_name_;std::string queue_name_;std::string routing_key_;
};
3. 依赖服务集成
3.1 服务发现与注册
// 服务注册
make_register_object(FLAGS_registry_host,FLAGS_base_service + FLAGS_instance_name,FLAGS_access_host);// 服务发现(用户服务)
make_discovery_object(FLAGS_registry_host,FLAGS_user_service_name);
3.2 数据库配置
make_mysql_object(FLAGS_dbuser, FLAGS_dbpassword, FLAGS_dbname,FLAGS_dbhost, FLAGS_dbport, FLAGS_dbcharset, FLAGS_db_con_num);
3.3 消息队列配置
make_rmq_object(FLAGS_rmquser, FLAGS_rmqpassword, FLAGS_rmq_host,FLAGS_rmq_port, FLAGS_exchange_name, FLAGS_queue_name, FLAGS_routing_key);
消息流转流程
1. 接收消息请求
客户端 → 网关 → 消息转发服务 (GetTransmitTarget)
2. 处理流程
1. 验证请求参数
2. 调用用户服务获取发送者信息
3. 生成消息ID和时间戳
4. 查询会话所有成员
5. 构建完整消息体
6. 发布到RabbitMQ
7. 返回目标用户列表
3. 数据流向
消息转发服务 → RabbitMQ → 消息推送服务 → 目标用户
关键技术实现
1. 错误处理机制
采用RAII和异常处理确保资源安全:
auto err_response = [this, response](const std::string &errmsg) -> void {response->set_success(false);response->set_errmsg(errmsg);return;
};brpc::ClosureGuard guard(done); // 确保done被调用
2. 数据库事务管理
使用ODB的事务机制保证数据一致性:
odb::transaction t(db_->begin());
try {// 数据库操作t.commit();
} catch (const std::exception& e) {t.rollback();// 错误处理
}
3. 服务治理
通过etcd实现服务的动态发现和负载均衡:
// 服务上线处理
moninfo.put_on_ = std::bind(&brpcChannelTool::ServiceManager::Online,service_manager_.get(), std::placeholders::_1, std::placeholders::_2);// 服务下线处理
moninfo.put_off_ = std::bind(&brpcChannelTool::ServiceManager::Offline,service_manager_.get(), std::placeholders::_1, std::placeholders::_2);
服务端详细设计
1.1 服务端入口 (main.cpp)
int main(int argc, char* argv[])
{// 1. 命令行参数解析gflags::ParseCommandLineFlags(&argc, &argv, true);// 2. 日志系统初始化LogModule::Log::Init(FLAGS_log_mode, FLAGS_log_file, FLAGS_log_level, FLAGS_log_output_mode);// 3. 建造者模式构建服务bite_im::MsgTransmitServerBuilder mtsb;// 4. 分层初始化各组件mtsb.make_channel_manager_object(FLAGS_user_service_name); // 服务管理mtsb.make_mysql_object(...); // 数据库mtsb.make_discovery_object(...); // 服务发现mtsb.make_rmq_object(...); // 消息队列mtsb.make_rpc_object(...); // RPC服务mtsb.make_register_object(...); // 服务注册// 5. 构建并启动服务std::shared_ptr<bite_im::MsgTransmitServer> server = mtsb.build();server->start(); // 阻塞运行直到收到退出信号return 0;
}
1.2 建造者模式详解
MsgTransmitServerBuilder 采用建造者模式,确保服务组件的正确初始化顺序:
class MsgTransmitServerBuilder {
public:// 构建步骤方法void make_channel_manager_object(const std::string &service_name);void make_mysql_object(...);void make_discovery_object(...);void make_rmq_object(...);void make_rpc_object(...);void make_register_object(...);// 最终构建方法std::shared_ptr<MsgTransmitServer> build();
};
初始化顺序的重要性:
- 先创建服务管理器(依赖发现)
- 再初始化数据库(数据存储基础)
- 然后设置服务发现(依赖其他服务)
- 接着配置消息队列(消息传输)
- 最后启动RPC服务和服务注册(对外提供服务)
1.3 核心服务实现
RPC服务实现类
class MsgTransmitServiceImpl : public bite_im::MsgTransmitService {
public:// 构造函数注入所有依赖MsgTransmitServiceImpl(std::shared_ptr<odb::core::database> &mysql_client,std::shared_ptr<brpcChannelTool::ServiceManager> &service_manager,std::string &user_service_name,std::shared_ptr<RMQTool::RMQClient> rmq_client,const std::string& exchange_name,const std::string& routing_key);// RPC方法实现void GetTransmitTarget(::google::protobuf::RpcController* controller,const ::bite_im::NewMessageReq* request,::bite_im::GetTransmitTargetRsp* response,::google::protobuf::Closure* done) override;
private:std::shared_ptr<bite_im::ChatSessionMemberTable> mysql_csmember_;std::shared_ptr<brpcChannelTool::ServiceManager> service_manager_;std::string user_service_name_;std::shared_ptr<RMQTool::RMQClient> rmq_client_;std::string exchange_name_;std::string routing_key_;
};
业务逻辑流程
void MsgTransmitServiceImpl::GetTransmitTarget(...) {brpc::ClosureGuard guard(done); // RAII确保回调执行try {// 1. 请求参数提取std::string uid = request->user_id();std::string chat_ssid = request->chat_session_id();// 2. RPC调用用户服务获取发送者信息bite_im::UserService_Stub user_stub(service_manager_->Choose(user_service_name_).get());bite_im::GetUserInfoRsp user_rsp;user_stub.GetUserInfo(&ctl, &user_req, &user_rsp, nullptr);// 3. 构建消息响应体bite_im::MessageInfo* msg = response->mutable_message();msg->set_timestamp(time(nullptr));msg->set_message_id(bite_im::uuid());msg->mutable_sender()->CopyFrom(user_rsp.user_info());// 4. 查询会话成员std::vector<ChatSessionMember> users = mysql_csmember_->members(chat_ssid);for(auto& member : users)response->add_target_id_list(member.user_id());// 5. 发布到消息队列rmq_client_->Publisher(exchange_name_, routing_key_, msg->SerializeAsString());response->set_success(true);} catch (const std::exception& e) {// 统一异常处理response->set_success(false);response->set_errmsg("触发消息转发异常:" + std::string(e.what()));}
}
1.4 数据访问层
数据库表映射
#pragma db object table("chat_session_member")
class ChatSessionMember {
private:#pragma db id autounsigned long id_; // 自增主键#pragma db type("varchar(64)") indexstd::string session_id_; // 会话ID#pragma db type("varchar(64)")std::string user_id_; // 用户ID
};
数据访问封装
class ChatSessionMemberTable {
public:// 核心操作方法bool append(ChatSessionMember& member); // 添加成员bool remove(const std::string& ssid); // 移除会话std::vector<ChatSessionMember> members(const std::string& ssid); // 查询成员private:std::shared_ptr<odb::core::database> db_; // 数据库连接
};
测试端详细设计
2.1 测试端入口 (测试代码)
int main(int argc, char* argv[]) {// 1. 测试框架初始化testing::InitGoogleTest(&argc, argv);gflags::ParseCommandLineFlags(&argc, &argv, true);// 2. 日志系统初始化LogModule::Log::Init(FLAGS_log_mode, FLAGS_log_file, FLAGS_log_level, FLAGS_log_output_mode);// 3. 服务发现初始化manager = std::make_shared<brpcChannelTool::ServiceManager>();manager->set_is_follow(false); // 关闭关注模式// 4. 设置etcd监视器Etcd_Tool::MonitorEtcd m(FLAGS_registry_host);Etcd_Tool::monitor_t info;info.monitor_path_ = FLAGS_transmite_host;info.put_on_ = std::bind(&brpcChannelTool::ServiceManager::Online, ...);info.put_off_ = std::bind(&brpcChannelTool::ServiceManager::Offline, ...);m.PushMonitor(info);// 5. 等待服务发现完成if (!m.WaitForInitialDiscovery()) {LOG_ERROR("服务发现初始化失败");return -1;}// 6. 运行所有测试用例return RUN_ALL_TESTS();
}
2.2 测试用例设计
TEST(消息转发测试, 消息转发一号测试) {// 1. 准备阶段 - 构建RPC客户端auto ptr = manager->Choose(FLAGS_transmite_host);ASSERT_TRUE(ptr != nullptr);bite_im::MsgTransmitService_Stub MsgTransmit_stub(ptr.get());// 2. 请求构造bite_im::NewMessageReq req;req.set_request_id(bite_im::uuid()); // 唯一请求IDreq.set_user_id("896296-ef487c-0002"); // 测试用户IDreq.set_chat_session_id("ssid1"); // 测试会话ID// 构建消息内容bite_im::MessageContent* msg = req.mutable_message();msg->set_message_type(bite_im::MessageType::STRING);msg->mutable_string_message()->set_content("My name is Test!");// 3. 执行测试brpc::Controller ctl;bite_im::GetTransmitTargetRsp rsp;MsgTransmit_stub.GetTransmitTarget(&ctl, &req, &rsp, nullptr);// 4. 断言验证ASSERT_EQ(ctl.Failed(), false); // RPC调用成功ASSERT_EQ(rsp.success(), true); // 业务逻辑成功ASSERT_EQ(rsp.request_id(), req.request_id()); // 请求ID匹配// 5. 详细日志输出for(auto& i : rsp.target_id_list()) {LOG_INFO("id list-id:{}", i); // 输出目标用户列表}// 6. 消息内容验证bite_im::MessageInfo rsp_msg = rsp.message();LOG_INFO("rsp_msg user_id:{}", rsp_msg.sender().user_id());LOG_INFO("rsp_msg nickname:{}", rsp_msg.sender().nickname());LOG_INFO("rsp_msg chat_session_id:{}", rsp_msg.chat_session_id());LOG_INFO("rsp_msg message_id:{}", rsp_msg.message_id());
}
2.3 测试策略分析
测试覆盖点:
- RPC通信测试 - 验证服务端可访问性
- 业务逻辑测试 - 验证消息转发核心逻辑
- 数据一致性测试 - 验证请求响应ID匹配
- 集成测试 - 验证整个调用链路的正确性
测试数据设计:
// 用户ID设计:包含多种格式测试
"896296-ef487c-0002" // 标准UUID格式
"user123" // 简单ID格式
"test-user-001" // 带前缀格式// 会话ID设计:
"ssid1" // 简单会话ID
"group-chat-001" // 群聊会话ID
"private-chat-123" // 私聊会话ID// 消息类型覆盖:
MessageType::STRING // 文本消息
MessageType::IMAGE // 图片消息
MessageType::FILE // 文件消息
三、配置管理系统
3.1 服务端配置
// 基础服务配置
DEFINE_string(registry_host, "http://127.0.0.1:2379", "etcd服务注册中心地址");
DEFINE_string(base_service, "/service", "服务监控根目录");
DEFINE_string(instance_name, "/transmite_service/instance", "当前实例名称");
DEFINE_string(access_host, "127.0.0.1:10004", "当前实例的外部访问地址");// RPC服务配置
DEFINE_int32(rpc_listen_port, 10004, "Rpc服务器监听端口");
DEFINE_int32(rpc_timeout, -1, "Rpc调用超时时间");
DEFINE_int32(rpc_threads, 1, "Rpc的IO线程数量");// 数据库配置
DEFINE_string(dbuser, "root", "数据库用户名");
DEFINE_string(dbpassword, "123456", "数据库密码");
DEFINE_string(dbname, "user", "数据库名称");
DEFINE_string(dbhost, "127.0.0.1", "数据库主机地址");
DEFINE_int32(dbport, 3306, "数据库端口");// 消息队列配置
DEFINE_string(rmquser, "root", "rmq用户名");
DEFINE_string(rmqpassword, "123456", "rmq密码");
DEFINE_string(rmq_host, "127.0.0.1", "rmq主机地址");
DEFINE_int32(rmq_port, 5672, "rmq端口号");
3.2 测试端配置
// 日志配置
DEFINE_bool(log_mode, false, "日志模式");
DEFINE_bool(log_output_mode, false, "控制台模式");
DEFINE_string(log_file, "app.log", "日志文件");
DEFINE_int32(log_level, 0, "日志等级");// 服务发现配置
DEFINE_string(registry_host, "http://127.0.0.1:2379", "etcd地址");
DEFINE_string(base_service, "/service", "服务根目录");
DEFINE_string(user_host, "/service/user_service", "用户服务路径");
DEFINE_string(transmite_host, "/service/transmite_service", "转发服务路径");
3.3 构建流程
- 代码生成阶段:
- 生成ProtoBuf序列化代码
- 生成ODB数据库映射代码
- 编译链接阶段:
- 编译业务逻辑代码
- 链接所有依赖库
- 目标产出:
transmiteServer- 消息转发服务端testClient- 自动化测试客户端
部署与运行
4.1 服务端启动
# 1. 直接运行
./transmiteServer \--registry_host="http://127.0.0.1:2379" \--rpc_listen_port=10004 \--dbhost="127.0.0.1" \--dbuser="root" \--dbpassword="123456"# 2. 后台运行
nohup ./transmiteServer [参数] > server.log 2>&1 &
4.2 测试端执行
# 运行所有测试用例
./testClient \--log_level=0 \--registry_host="http://127.0.0.1:2379" \--transmite_host="/service/transmite_service"# 查看测试结果
[==========] Running 1 test from 1 test suite.
[----------] Global test environment set-up.
[----------] 1 test from 消息转发测试
[ RUN ] 消息转发测试.消息转发一号测试
[ OK ] 消息转发测试.消息转发一号测试 (15 ms)
[----------] 1 test from 消息转发测试 (15 ms total)
CMakeLists.txt
CMake构建配置
# 1. 添加cmake版本说明
cmake_minimum_required(VERSION 3.1.3)
# 2. 声明工程名称
project(transmiteServer)# 3. 检测并生成proto框架代码
# 1. 添加所需的proto映射代码文件名称
set(proto_path ${CMAKE_CURRENT_SOURCE_DIR}/../proto)
set(proto_transmites transmite.proto base.proto user.proto)
# 2. 检测框架代码文件是否已经生成
set(proto_h "")
set(proto_cc "")
set(proto_srcs "")
foreach(proto_transmite ${proto_transmites})
# 3. 如果没有生成,则预定义生成指令 -- 用于在构建项目之间先生成框架代码string(REPLACE ".proto" ".pb.h" proto_h ${proto_transmite})string(REPLACE ".proto" ".pb.cc" proto_cc ${proto_transmite})if (NOT EXISTS ${CMAKE_CURRENT_BINARY_DIR}/${proto_cc})add_custom_command(PRE_BUILDCOMMAND protocARGS --cpp_out=${CMAKE_CURRENT_BINARY_DIR}-I ${proto_path} --experimental_allow_proto3_optional${proto_path}/${proto_transmite}DEPENDS ${proto_path}/${proto_transmite}OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/${proto_cc}COMMENT "生成proto框架代码文件:" ${CMAKE_CURRENT_BINARY_DIR}/${proto_cc})endif()
# 4. 将所有生成的框架源码文件名称保存起来 student.pb.cc classes.pb.cclist(APPEND proto_srcs ${CMAKE_CURRENT_BINARY_DIR}/${proto_cc})
endforeach()# 3.1 检测并生成ODB框架代码
# 1. 添加所需的odb映射代码文件名称
set(odb_path ${CMAKE_CURRENT_SOURCE_DIR}/../odb)
set(odb_files chat_session_member.hxx)
# 2. 检测框架代码文件是否已经生成
set(odb_hxx "")
set(odb_cxx "")
set(odb_srcs "")
foreach(odb_file ${odb_files})
# 3. 如果没有生成,则预定义生成指令 -- 用于在构建项目之间先生成框架代码string(REPLACE ".hxx" "-odb.hxx" odb_hxx ${odb_file})string(REPLACE ".hxx" "-odb.cxx" odb_cxx ${odb_file})if (NOT EXISTS ${CMAKE_CURRENT_BINARY_DIR}${odb_cxx})add_custom_command(PRE_BUILDCOMMAND odbARGS -d mysql --std c++17 -q -s --profile boost/date-time --output-dir ${CMAKE_CURRENT_BINARY_DIR} ${odb_path}/${odb_file}DEPENDS ${odb_path}/${odb_file}OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/${odb_cxx}COMMENT "生成ODB框架代码文件:" ${CMAKE_CURRENT_BINARY_DIR}/${odb_cxx})endif()
# 4. 将所有生成的框架源码文件名称保存起来 student-odb.cxx classes-odb.cxxlist(APPEND odb_srcs ${CMAKE_CURRENT_BINARY_DIR}/${odb_cxx})
endforeach()# 4. 获取源码目录下的所有源码文件
set(src_transmites "")
aux_source_directory(${CMAKE_CURRENT_SOURCE_DIR}/source src_transmites)set(test_src_transmites "")
aux_source_directory(${CMAKE_CURRENT_SOURCE_DIR}/test test_src_transmites)
# 5. 声明目标及依赖
set(target "transmiteServer")
set(test_client "testClient")
add_executable(${target} ${src_transmites} ${proto_srcs} ${odb_srcs})
add_executable(${test_client} ${test_src_transmites} ${proto_srcs} ${odb_srcs})
# 6. 设置头文件默认搜索路径
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/source)
include_directories(${CMAKE_CURRENT_BINARY_DIR})
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../common)
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../odb)
# 7. 设置需要连接的库
target_link_libraries(${target} -lprotobuf -lgtest
-lgflags -lspdlog -lbrpc -lpthread -lssl -lleveldb -lcrypto
-lcpr -lamqpcpp -lev -lodb-mysql -lodb -lmysqlclient
-letcd-cpp-api -lcpprest -lcurl -lfmt -ljsoncpp -L/usr/local/lib)target_link_libraries(${test_client} -lprotobuf -lgtest
-lgflags -lspdlog -lbrpc -lpthread -lssl -lleveldb -lcrypto
-lcpr -lamqpcpp -lev -lodb-mysql -lodb -lmysqlclient
-letcd-cpp-api -lcpprest -lcurl -lfmt -ljsoncpp -L/usr/local/lib)#8. 设置安装路径
INSTALL(TARGETS ${target} RUNTIME DESTINATION bin)
INSTALL(TARGETS ${test_client} RUNTIME DESTINATION bin)# 生成ProtoBuf代码
add_custom_command(PRE_BUILD COMMAND protoc ...)# 生成ODB数据库映射代码
add_custom_command(PRE_BUILD COMMAND odb ...)# 链接依赖库
target_link_libraries(${target} -lprotobuf -lbrpc -lodb-mysql -lamqpcpp -letcd-cpp-api ...)
性能优化考虑
- 连接池管理:MySQL连接池复用数据库连接
- 异步处理:使用brpc的异步RPC调用
- 批量操作:支持数据库批量插入和查询
- 资源清理:使用智能指针自动管理资源
扩展性设计
- 微服务架构:易于水平扩展
- 配置外部化:所有配置通过命令行参数传入
- 插件化设计:各组件通过接口隔离,易于替换
