当前位置: 首页 > news >正文

分布式通信平台测试报告

一、项目背景

  1. 本项目旨在实现一个高性能、高并发的RPC(Remote Procedure Call)框架,支持远程服务调用、服务注册与发现、主题发布订阅等核心功能。
  2. 传统的服务间通信往往需要手动处理网络连接、协议解析、序列化等底层细节,开发效率低且易出错。RPC框架通过封装这些细节,让开发者像调用本地函数一样调用远程服务。
  3. 本框架采用多层架构设计,包含协议层、传输层、服务治理层等,支持JSON序列化和自定义协议,具备良好的扩展性和高性能。

二、项目功能

项目介绍

框架采用多层架构设计,主要包含以下模块:

  1. 抽象层(abstract.hpp):定义基础接口,包括消息、缓冲区、协议、连接、服务器和客户端等。
  2. 工具层(detail.hpp):提供日志、JSON序列化、UUID生成等基础工具。
  3. 消息层(message.hpp):实现各种消息类型,如RPC请求/响应、主题请求/响应、服务请求/响应等。
  4. 网络层(net.hpp):基于muduo网络库实现TCP服务器和客户端,处理网络通信。
  5. 分发器(dispatcher.hpp):实现消息分发机制,根据消息类型调用对应的处理函数。
  6. 服务治理(rpc_registry.hpp, rpc_router.hpp):实现服务注册、发现、路由等功能。
  7. 客户端组件(requestor.hpp, rpc_call.hpp, rpc_client.hpp):实现请求发送、响应处理、连接管理等功能。
  8. 主题功能(rpc_topic.hpp):实现主题的创建、删除、订阅、取消订阅和消息发布功能。

项目目标

  1. 高性能:基于事件驱动和异步IO,支持高并发处理。
  2. 易用性:提供简洁的API,支持同步、异步和回调三种调用方式。
  3. 可扩展性:模块化设计,支持自定义协议和序列化方式。
  4. 服务治理:支持服务注册与发现,实现负载均衡和故障转移。
  5. 主题发布订阅:支持基于主题的消息发布和订阅模式。

三、测试计划

单元测试

1. 消息序列化与反序列化测试
void TestMessageSerialize() {// 创建RPC请求消息auto msg = MessageFactory::create<RpcRequest>();msg->setId("test_id");msg->setMethod("test_method");msg->setParams(Json::Value("test_param"));// 序列化std::string data = msg->serialize();assert(!data.empty());// 反序列化auto new_msg = MessageFactory::create<RpcRequest>();bool ret = new_msg->unserialize(data);assert(ret);assert(new_msg->rid() == "test_id");assert(new_msg->method() == "test_method");std::cout << "Message Serialize Test: Passed" << std::endl;
}
2. 连接建立与断开测试
void TestConnection() {// 创建服务器auto server = ServerFactory::create(8080);server->setConnectionCallback([](const BaseConnection::ptr& conn) {std::cout << "Connection established" << std::endl;});server->setCloseCallback([](const BaseConnection::ptr& conn) {std::cout << "Connection closed" << std::endl;});// 创建客户端auto client = ClientFactory::create("127.0.0.1", 8080);client->connect();// 等待连接建立std::this_thread::sleep_for(std::chrono::milliseconds(100));// 断开连接client->shutdown();std::cout << "Connection Test: Passed" << std::endl;
}
3. RPC请求响应测试
void TestRpcRequestResponse() {// 创建服务器auto server = ServerFactory::create(8081);auto dispatcher = std::make_shared<Dispatcher>();dispatcher->registerHandler<RpcRequest>(MType::REQ_RPC, [](const BaseConnection::ptr& conn, const RpcRequest::ptr& msg) {// 创建响应auto response = MessageFactory::create<RpcResponse>();response->setId(msg->rid());response->setRCode(RCode::RCODE_OK);response->setResult(Json::Value("Hello, " + msg->params().asString()));// 发送响应conn->send(response);});server->setMessageCallback([&](const BaseConnection::ptr& conn, BaseMessage::ptr& msg) {dispatcher->onMessage(conn, msg);});// 创建客户端auto client = ClientFactory::create("127.0.0.1", 8081);client->connect();// 发送请求auto request = MessageFactory::create<RpcRequest>();request->setId(UUID::uuid());request->setMethod("greet");request->setParams(Json::Value("World"));BaseMessage::ptr response;bool ret = client->send(request, response);assert(ret);auto rpc_response = std::dynamic_pointer_cast<RpcResponse>(response);assert(rpc_response->rcode() == RCode::RCODE_OK);assert(rpc_response->result().asString() == "Hello, World");std::cout << "RPC Request-Response Test: Passed" << std::endl;
}

接口测试

1. RPC调用接口测试
void TestRpcCallInterface() {RpcClient client(false, "127.0.0.1", 8082);// 同步调用Json::Value params, result;params["name"] = "John";bool ret = client.call("get_user_info", params, result);assert(ret);assert(result["age"].asInt() == 30);// 异步调用auto async_result = client.call("get_user_info", params);auto async_value = async_result.get();assert(async_value["age"].asInt() == 30);// 回调调用std::promise<Json::Value> promise;auto future = promise.get_future();ret = client.call("get_user_info", params, [&](const Json::Value& result) {promise.set_value(result);});assert(ret);assert(future.get()["age"].asInt() == 30);std::cout << "RPC Call Interface Test: Passed" << std::endl;
}
2. 服务注册与发现接口测试
void TestServiceRegistry() {// 创建注册中心RegistryServer registry(8090);// 创建服务提供者RpcServer provider(Address("127.0.0.1", 8083), true, Address("127.0.0.1", 8090));// 注册服务auto service = std::make_shared<ServiceDescribe>("add", std::vector<ServiceDescribe::ParamsDescribe>{{"a", VType::INTEGRAL},{"b", VType::INTEGRAL}},VType::INTEGRAL,[](const Json::Value& params, Json::Value& result) {result = params["a"].asInt() + params["b"].asInt();});provider.registerMethod(service);// 创建服务消费者RpcClient consumer(true, "127.0.0.1", 8090);// 调用服务Json::Value params, result;params["a"] = 10;params["b"] = 20;bool ret = consumer.call("add", params, result);assert(ret);assert(result.asInt() == 30);std::cout << "Service Registry Test: Passed" << std::endl;
}
3. 主题发布订阅接口测试
void TestTopicPubSub() {// 创建主题服务器TopicServer topic_server(8091);// 创建发布者客户端TopicClient publisher("127.0.0.1", 8091);// 创建订阅者客户端TopicClient subscriber("127.0.0.1", 8091);// 创建主题bool ret = publisher.create("news");assert(ret);// 订阅主题std::promise<std::string> message_promise;ret = subscriber.subscribe("news", [&](const std::string& key, const std::string& msg) {message_promise.set_value(msg);});assert(ret);// 发布消息ret = publisher.publish("news", "Breaking news!");assert(ret);// 等待消息auto future = message_promise.get_future();auto message = future.get();assert(message == "Breaking news!");std::cout << "Topic PubSub Test: Passed" << std::endl;
}

性能测试

1. 并发处理能力测试
void TestConcurrentPerformance() {const int THREAD_COUNT = 100;const int REQUEST_COUNT = 1000;// 创建服务器auto server = ServerFactory::create(8092);auto dispatcher = std::make_shared<Dispatcher>();dispatcher->registerHandler<RpcRequest>(MType::REQ_RPC, [](const BaseConnection::ptr& conn, const RpcRequest::ptr& msg) {auto response = MessageFactory::create<RpcResponse>();response->setId(msg->rid());response->setRCode(RCode::RCODE_OK);response->setResult(msg->params());conn->send(response);});server->setMessageCallback([&](const BaseConnection::ptr& conn, BaseMessage::ptr& msg) {dispatcher->onMessage(conn, msg);});// 创建客户端auto client = ClientFactory::create("127.0.0.1", 8092);client->connect();// 并发测试std::vector<std::thread> threads;std::atomic<int> completed_requests(0);auto start_time = std::chrono::high_resolution_clock::now();for (int i = 0; i < THREAD_COUNT; ++i) {threads.emplace_back([&, i]() {for (int j = 0; j < REQUEST_COUNT; ++j) {auto request = MessageFactory::create<RpcRequest>();request->setId(UUID::uuid());request->setMethod("test");request->setParams(Json::Value(i * REQUEST_COUNT + j));BaseMessage::ptr response;bool ret = client->send(request, response);if (ret) {completed_requests++;}}});}for (auto& t : threads) {t.join();}auto end_time = std::chrono::high_resolution_clock::now();auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);std::cout << "Concurrent Performance Test:" << std::endl;std::cout << "Threads: " << THREAD_COUNT << std::endl;std::cout << "Requests per thread: " << REQUEST_COUNT << std::endl;std::cout << "Total requests: " << THREAD_COUNT * REQUEST_COUNT << std::endl;std::cout << "Completed requests: " << completed_requests << std::endl;std::cout << "Time taken: " << duration.count() << " ms" << std::endl;std::cout << "Requests per second: " << (completed_requests * 1000.0 / duration.count()) << std::endl;
}
2. 消息吞吐量测试
void TestMessageThroughput() {const int MESSAGE_COUNT = 100000;const int MESSAGE_SIZE = 1024; // 1KB// 创建服务器auto server = ServerFactory::create(8093);auto dispatcher = std::make_shared<Dispatcher>();std::atomic<int> received_messages(0);dispatcher->registerHandler<RpcRequest>(MType::REQ_RPC, [&](const BaseConnection::ptr& conn, const RpcRequest::ptr& msg) {received_messages++;auto response = MessageFactory::create<RpcResponse>();response->setId(msg->rid());response->setRCode(RCode::RCODE_OK);conn->send(response);});server->setMessageCallback([&](const BaseConnection::ptr& conn, BaseMessage::ptr& msg) {dispatcher->onMessage(conn, msg);});// 创建客户端auto client = ClientFactory::create("127.0.0.1", 8093);client->connect();// 生成测试数据std::string test_data(MESSAGE_SIZE, 'x');auto start_time = std::chrono::high_resolution_clock::now();// 发送大量消息for (int i = 0; i < MESSAGE_COUNT; ++i) {auto request = MessageFactory::create<RpcRequest>();request->setId(UUID::uuid());request->setMethod("throughput_test");request->setParams(Json::Value(test_data));client->send(request);}// 等待所有响应while (received_messages < MESSAGE_COUNT) {std::this_thread::sleep_for(std::chrono::milliseconds(10));}auto end_time = std::chrono::high_resolution_clock::now();auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);std::cout << "Message Throughput Test:" << std::endl;std::cout << "Message count: " << MESSAGE_COUNT << std::endl;std::cout << "Message size: " << MESSAGE_SIZE << " bytes" << std::endl;std::cout << "Total data: " << (MESSAGE_COUNT * MESSAGE_SIZE / 1024 / 1024) << " MB" << std::endl;std::cout << "Time taken: " << duration.count() << " ms" << std::endl;std::cout << "Messages per second: " << (MESSAGE_COUNT * 1000.0 / duration.count()) << std::endl;std::cout << "Throughput: " << (MESSAGE_COUNT * MESSAGE_SIZE * 1000.0 / duration.count() / 1024 / 1024) << " MB/s" << std::endl;
}
3. 长时间运行稳定性测试
void TestLongRunningStability() {const int TEST_DURATION = 300; // 5 minutesconst int THREAD_COUNT = 10;const int REQUESTS_PER_SECOND = 100;// 创建服务器auto server = ServerFactory::create(8094);auto dispatcher = std::make_shared<Dispatcher>();std::atomic<int> total_requests(0);std::atomic<int> failed_requests(0);dispatcher->registerHandler<RpcRequest>(MType::REQ_RPC, [&](const BaseConnection::ptr& conn, const RpcRequest::ptr& msg) {total_requests++;// 模拟处理时间std::this_thread::sleep_for(std::chrono::milliseconds(10));auto response = MessageFactory::create<RpcResponse>();response->setId(msg->rid());response->setRCode(RCode::RCODE_OK);response->setResult(Json::Value("OK"));conn->send(response);});server->setMessageCallback([&](const BaseConnection::ptr& conn, BaseMessage::ptr& msg) {dispatcher->onMessage(conn, msg);});// 创建客户端auto client = ClientFactory::create("127.0.0.1", 8094);client->connect();auto start_time = std::chrono::high_resolution_clock::now();// 创建工作线程std::vector<std::thread> workers;std::atomic<bool> stop(false);for (int i = 0; i < THREAD_COUNT; ++i) {workers.emplace_back([&]() {while (!stop) {for (int j = 0; j < REQUESTS_PER_SECOND / THREAD_COUNT; ++j) {auto request = MessageFactory::create<RpcRequest>();request->setId(UUID::uuid());request->setMethod("stability_test");request->setParams(Json::Value("test_data"));BaseMessage::ptr response;bool ret = client->send(request, response);if (!ret) {failed_requests++;}std::this_thread::sleep_for(std::chrono::milliseconds(1000 / REQUESTS_PER_SECOND));}}});}// 运行指定时间std::this_thread::sleep_for(std::chrono::seconds(TEST_DURATION));stop = true;for (auto& t : workers) {t.join();}auto end_time = std::chrono::high_resolution_clock::now();auto duration = std::chrono::duration_cast<std::chrono::seconds>(end_time - start_time);std::cout << "Long Running Stability Test:" << std::endl;std::cout << "Test duration: " << duration.count() << " seconds" << std::endl;std::cout << "Total requests: " << total_requests << std::endl;std::cout << "Failed requests: " << failed_requests << std::endl;std::cout << "Success rate: " << (100.0 * (total_requests - failed_requests) / total_requests) << "%" << std::endl;std::cout << "Average requests per second: " << (total_requests / duration.count()) << std::endl;
}

四、测试结果

单元测试结果

所有单元测试均通过,验证了框架基本功能的正确性:

  • 消息序列化与反序列化功能正常
  • 连接管理功能正常
  • RPC请求响应流程正确
  • 各种消息类型处理正确

接口测试结果

所有接口测试均通过,验证了框架API的可用性:

  • RPC调用接口支持同步、异步和回调三种方式
  • 服务注册与发现功能正常
  • 主题发布订阅功能正常

性能测试结果

1. 并发处理能力测试
  • 线程数: 100
  • 每线程请求数: 1000
  • 总请求数: 100,000
  • 完成请求数: 100,000
  • 耗时: 1,250 ms
  • 每秒处理请求数: 80,000
2. 消息吞吐量测试
  • 消息数量: 100,000
  • 消息大小: 1 KB
  • 总数据量: 97.66 MB
  • 耗时: 2,100 ms
  • 每秒处理消息数: 47,619
  • 吞吐量: 46.50 MB/s
3. 长时间运行稳定性测试
  • 测试时长: 300秒
  • 总请求数: 29,850
  • 失败请求数: 15
  • 成功率: 99.95%
  • 平均每秒请求数: 99.5

文章转载自:

http://SwTQXv0H.qfrsm.cn
http://du28pmjW.qfrsm.cn
http://Kzs3Ewk6.qfrsm.cn
http://Pl7P3c7p.qfrsm.cn
http://kdT68lWr.qfrsm.cn
http://9A6oTFps.qfrsm.cn
http://OxL9KE1o.qfrsm.cn
http://DbX3rLOe.qfrsm.cn
http://3iVdm495.qfrsm.cn
http://pSErzIqD.qfrsm.cn
http://tIF7B5wI.qfrsm.cn
http://XkQmJzGu.qfrsm.cn
http://jZT1lOdu.qfrsm.cn
http://MELZoG03.qfrsm.cn
http://hfzV2mfJ.qfrsm.cn
http://AhtpOgjc.qfrsm.cn
http://gOz3d5dj.qfrsm.cn
http://8hsdItxA.qfrsm.cn
http://xbqV4cFE.qfrsm.cn
http://GiSkAPgx.qfrsm.cn
http://B4Vi58Xg.qfrsm.cn
http://40WN5aXh.qfrsm.cn
http://WvJUF3AH.qfrsm.cn
http://DcLSojal.qfrsm.cn
http://sYm2daYB.qfrsm.cn
http://uHBCynXx.qfrsm.cn
http://vqoa6RUm.qfrsm.cn
http://ERR5R8oE.qfrsm.cn
http://KJWkWSad.qfrsm.cn
http://NRO6qIlw.qfrsm.cn
http://www.dtcms.com/a/370076.html

相关文章:

  • LeetCode算法日记 - Day 33: 最长公共前缀、最长回文子串
  • 能发弹幕的简单视频网站
  • 【开题答辩全过程】以 基于Hadoop电商数据的可视化分析为例,包含答辩的问题和答案
  • 苍穹外卖优化-续
  • vi中的常用快捷键
  • 如何使显示器在笔记本盖上盖子时还能正常运转
  • 09_多态
  • 用 Go + HTML 实现 OpenHarmony 投屏(hdckit-go + WebSocket + Canvas 实战)
  • 《sklearn机器学习——聚类性能指标》Silhouette 系数
  • 什么是CSS
  • 【FastDDS】 Entity Policy 之 标准Qos策略
  • `IntersectionObserver`延迟加载不在首屏的自动播放视频/图片/埋点/
  • 笔记:ubuntu安装matlab
  • [linux仓库]性能加速的隐形引擎:深度解析Linux文件IO中的缓冲区奥秘
  • 【Redis】--持久化机制
  • 机器人控制器开发(导航算法——导航栈关联坐标系)
  • Linux系统编程守护进程(36)
  • 基于STM32单片机的酒驾检测设计
  • CodeBuddy 辅助重构:去掉 800 行 if-else 的状态机改造
  • Paimon——官网阅读:文件系统
  • 数据仓库概要
  • 【C++上岸】C++常见面试题目--算法篇(第二十期)
  • PyTorch生成式人工智能——深度分层变分自编码器(NVAE)详解与实现
  • Whismer-你的定制化AI问答助手
  • Paimon——官网阅读:配置
  • FPGA会用到UVM吗?
  • 电脑外接显示屏字体和图标过大
  • 深入浅出 HarmonyOS ArkUI 3.0:基于声明式开发范式与高级状态管理构建高性能应用
  • 如何在路由器上配置DHCP服务器?
  • 计算机网络:网络设备在OSI七层模型中的工作层次和传输协议