QT聊天项目DAY08
1.使用IOContextPool提高并发量
1.1 新建连接池类
#ifndef __ASIOIOSERVICEPOOL_H__
#define __ASIOIOSERVICEPOOL_H__#include <vector>
#include <boost/asio.hpp>
#include "Singletion.h"
class AsioIOServicePool : public Singletion<AsioIOServicePool>
{friend Singletion<AsioIOServicePool>;public:using IOService = boost::asio::io_context;/* 防止io_context.run() 提前退出 */using Work = boost::asio::io_context::work;using WorkPtr = std::unique_ptr<Work>;public:~AsioIOServicePool();AsioIOServicePool(const AsioIOServicePool&) = delete;AsioIOServicePool& operator=(const AsioIOServicePool&) = delete;public:IOService& GetIOService();void Stop();private:AsioIOServicePool(std::size_t size = 2);private:std::vector<IOService> m_ioServices;std::vector<WorkPtr> m_works;std::vector<std::thread> m_threads;std::size_t m_nextIOService;
};#endif // __ASIOIOSERVICEPOOL_H__
这里写着写着突然让我领会到了STL源码解析的重要性
在m_threads.emplace_back([this, i]()
{
m_ioServices[i].run();
});
里可以采用thread([this,i](){})的方式写,但是会导致多复制一次,所以使用临时对象的方式来添加线程
#include "AsioIOServicePool.h"
#include <iostream>
using namespace std;AsioIOServicePool::AsioIOServicePool(std::size_t size):m_ioServices(size),m_works(size),m_nextIOService(0)
{for (size_t i = 0; i < size; ++i){/* 每个IOService对象要有一个Work对象,否则它在没有任务时会立即退出run() */ m_works[i] = unique_ptr<Work>(new Work(m_ioServices[i]));m_threads.emplace_back([this, i](){m_ioServices[i].run();});}
}AsioIOServicePool::~AsioIOServicePool()
{Stop();cout << "AsioIOServicePool::~AsioIOServicePool()" << endl;
}AsioIOServicePool::IOService& AsioIOServicePool::GetIOService()
{IOService& ioService = m_ioServices[m_nextIOService];m_nextIOService = (m_nextIOService + 1) % m_ioServices.size();return ioService;
}void AsioIOServicePool::Stop()
{/* 先停止已经绑定了读或写的监听事件的服务,然后重置Work对象 */for (auto& work : m_works){work->get_io_context().stop();work.reset();}/* 等待所有线程退出 */for (auto& thread : m_threads){if (thread.joinable()){thread.join();}}
}
io_context像一个任务调度中心,Wrok是一个"假任务",告诉调度中心:"你别关门,我一会还有任务要来"
1.2 关于线程的一些知识点
当添加线程时,这个线程就已经启动了
也就是已经开始监听了
1.3 什么是io_context事件循环?
关于epoll的本质可以看下面这篇文章,我这里不做阐述了
https://zhuanlan.zhihu.com/p/17856755436#:~:text=socket%E4%BA%8B%E4%BB%B6%E6%B7%BB%E5%8A%A0%E6%88%90%E5%8A%9F%E5%90%8E%EF%BC%8Cepoll%E6%89%8D%E8%83%BD%E7%9B%91%E5%90%ACsocket%E8%AF%BB%E5%86%99%E4%BA%8B%E4%BB%B6%E3%80%82%20%E5%A6%82%E6%9E%9Cepoll%E5%B0%B1%E7%BB%AA%E9%98%9F%E5%88%97%E6%9C%89%E5%B0%B1%E7%BB%AA%E4%BA%8B%E4%BB%B6%EF%BC%8C%E7%94%A8%E6%88%B7%E7%A8%8B%E5%BA%8F%E8%B0%83%E7%94%A8epoll_wait%E5%87%BD%E6%95%B0%E4%BC%9A%E6%88%90%E5%8A%9F%E8%8E%B7%E5%8F%96%E5%88%B0%E5%B0%B1%E7%BB%AA%E4%BA%8B%E4%BB%B6%E3%80%82,%E5%A6%82%E6%9E%9C%E6%B2%A1%E6%9C%89%E5%B0%B1%E7%BB%AA%E4%BA%8B%E4%BB%B6%EF%BC%8C%E5%88%99epoll%E7%BA%BF%E7%A8%8B%E9%99%B7%E5%85%A5%E4%BC%91%E7%9C%A0%E3%80%82%20%E5%BD%93socket%E6%8E%A5%E6%94%B6%E5%88%B0%E6%95%B0%E6%8D%AE%E5%90%8E%EF%BC%8C%E9%80%9A%E8%BF%87socket%E7%AD%89%E5%BE%85%E9%98%9F%E5%88%97%E5%8F%AF%E4%BB%A5%E5%94%A4%E9%86%92%E4%BC%91%E7%9C%A0%E7%9A%84epoll%E7%BA%BF%E7%A8%8B%EF%BC%8C%E5%B9%B6%E5%B0%86socket%E5%B0%81%E8%A3%85%E6%88%90epoll%E5%B0%B1%E7%BB%AA%E4%BA%8B%E4%BB%B6%E6%8F%92%E5%85%A5%E5%B0%B1%E7%BB%AA%E9%98%9F%E5%88%97%E3%80%82
本质上就是一个epoll()监听外加一个事件分发器,假设有三种事件类型,按照正常的linux开发来说应该是三种,分别是连接事件,连接中有数据到来事件以及连接的读写事件,epoll会循环的监听这三种事件,epool监听到这三种事件会将事件分发给处理该事件的对象,其实这就和io_context差不多了,本质上都是在监听事件有没有被触发,被触发了就执行对应的回调
int main(int argc, char *argv[]){structepoll_eventev, events[MAX_EVENTS];int sock_fd, ret = 0;int efd = epoll_create(10); //创建epoll实例ev.data.fd = sock_fd;ev.events = EPOLLIN; //注册监听端口连接事件epoll_ctl(efd, EPOLL_CTL_ADD, sock_fd, &ev);while (1) {// 超时1000毫秒,获取就绪事件int nfds = epoll_wait(efd, events, MAX_EVENTS, 1000);if (nfds == -1) return-1; // 获取失败退出elseif (nfds == 0) continue; // 超时,继续下一轮事件获取// 轮询就绪事件数组for (int i = 0; i < nfds; i++) { int fd = events[i].data.fd;// 新连接到来if (fd == sock_fd) {new_fd = accept(sock_fd, (struct sockaddr *)&peer, &addrlen);setnonblocking(new_fd); // 设置新套接字为非阻塞模式ev.data.fd = new_fd;ev.events = EPOLLIN|EPOLLET; // 添加新套接字epoll_ctl(efd, EPOLL_CTL_ADD, new_fd, &ev); // 添加新的监听,等待该事件到来} else { // 已经分配的连接有数据到来if (events[i].events & EPOLLIN) { // EPOLLIN事件recv(fd, recv_buf, len, 0); // 读取数据/* 对该数据做对应的处理, 这里可以分配线程去单独的处理每一个事件 */ }}} }return 0;
}
下面就是io_context事件循环绑定了监听端口是否有连接事件,当事件触发时会直接调用对应的回调函数,直接简化了上述的epoll操作,非常nice
所以创建了两个线程去不停的轮询事件循环,只需要在端口号到来时绑定事件,去监听事件中是否有数据到来即可。如何绑定事件的关键在于async的类型
调用了什么async_*函数,就绑定了对应类型的事件,如下
async_*的调用本身就是注册事件的过程,这些事件都会被挂载到io_context的事件队列中,等待epoll/select/kqueue检测和触发
将两个线程中的事件循环绑定到async_read()上实时的监听socket(连接)中是否有数据到来,就变成了主线程实时监听端口是否有客户端发来的新连接,子线程实时监听连接中客户端是否发来数据
iocontext一直都在轮询监听注册的事件,类似于epoll的边缘触发,也就是不会重复的监听注册的事件,当注册的事件被监听到并执行完回调后,需要在重新注册,一个iocontext可以被多个socket绑定,本质上socket就对应一个事件的监听(就像电话总站和下属的电话机一样,电话总站收到电话,查询是找其下属的电话机,就会把这个连接交给下属的电话机去处理事情,处理完之后,就会把这个电话机移除,后续电话总站也不会处理这个电话机相应的服务哦,也就是说呢,一个电话机总站可以处理多个下属电话机服务);一个事件循环可以处理好几个不同的连接,而不是一个线程只能处理一个连接
io_context 像电话总站(事件循环器)
socket 是电话机(单个事件源)
电话总站轮询注册的 socket,有事件了就派发到对应 handler,处理完自动“移除”。
一个 io_context 能服务多个 socket,多个连接并发处理,不是一个 socket 一个线程。
1.4 改为每次从连接池中获取连接
之前是每有一个连接到来就将套接字分配给新的连接,但是这个连接还是在主线程上运行的,可以打印一下线程ID
这里可以看到分配连接和执行连接逻辑处理都是在一个线程上运行,这样无法做到高并发的
现在修改为将连接的处理逻辑分配到一个单独的线程上去处理
/* 实时的监听该端口是否有客户端发来新的连接 */
void CServer::Start()
{auto& IOService = AsioIOServicePool::GetInstance()->GetIOService();HttpConnection* pConnection = new HttpConnection(IOService); // 创建新的连接对象_acceptor.async_accept(pConnection->GetSocket(), [this, pConnection](boost::system::error_code ec){try {if (ec){Start(); // 重新监听return;}pConnection->Start(); // 启动连接Start(); // 继续监听cout << "CServer Thread ID: " << this_thread::get_id() << "\n";}catch (const std::exception& e) {std::cerr << "CServer::Exception: " << e.what() << "\n";}});
}
管理连接的修改,用事件循环来初始化空套接字,最后在端口中监听客户端连接到来时直接赋值给这个套接字
HttpConnection::HttpConnection(net::io_context& ioContext): _socket(ioContext)
{CheckDeadline(); // 绑定超时的回调
}tcp::socket& HttpConnection::GetSocket()
{return _socket;
}
编译,出现了模板重定义的问题,没有加上
#ifndef _SINGLETON_H_
#define _SINGLETON_H_
#endif
导致的
再编译,最后仍然没有变换线程,可以看出确实是创建出线程了,但是事件循环还是在主线程中运行的
也有可能是回调函数是在线程中进行处理的,更改打印线程id的位置为当事件发生的回调函数里
1.5 关于线程的一些看法
如上文一样,回调函数是放在线程中处理的,这样的操作非常类似于qt中的movetoThread
添加qt的类到普通的c++工程中,如果遇到了报错,需要导入qt环境
在解决方案下面将该工程转换成qt工程
Qt::DirectConnection
槽函数在发送信号的线程中被调用
Qt::QueuedConnection
槽函数在接收者的线程中被调用
Qt::AutoConnection
如果发送者和接收者在同一个线程中,使用DirectConnection
不在同一个线程中使用QueuedConnection
观察源码可以看到connect的最后一个参数默认为AutoConnection了
抽时间一定要好好看看STL源码解析
2. GRPC连接池
当服务器里分配线程去处理连接中的逻辑后,会向grpc服务器(java写的那段代码)发送请求,但是此时只有一个grpc通道,也就是说所有的线程在处理完客户端发来的数据后要共用一个通道去请求grpc服务器发送邮箱验证码。
这样做是不对的,应该线程去从grpc连接池中取出空闲的通道,这样就能够极大的利用线程的效率了
全局只有这里应用了grpc邮箱请求,所以最后的优化放在GetVerifyCode里,如下
2.1 锁
lock_guard锁(RAII锁)
std::mutex mtx;void safe_increment() {std::lock_guard<std::mutex> lock(mtx); // 自动加锁// 临界区
} // 离开作用域时自动解锁
unique_lock锁
std::mutex mtx;
std::condition_variable cv;void wait_for_event() {std::unique_lock<std::mutex> lock(mtx);cv.wait(lock, [] { return ready; }); // 需要 unique_lock
}
// 假设 ready = false;
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, []{ return ready; });
// 等价于以下逻辑(伪代码)while (!ready) {lock.unlock(); // 释放锁block until notified; // 等待唤醒lock.lock(); // 唤醒后重新加锁
}
// 退出 while 时 lock 仍然是加锁状态
2.2 GRPC连接池代码
class StubPool
{
public:StubPool(size_t poolSize): _bStop(false), _poolSize(poolSize){for (size_t i = 0; i < poolSize; i++){int Port = ServerStatic::ParseConfig("./Config/config.json", "VerifyServer", "Port");string Ip = "localhost:" + to_string(Port);shared_ptr<Channel> channel = grpc::CreateChannel(Ip,grpc::InsecureChannelCredentials()); // 创建GRPC通道_stubQueue.push(VerifyService::NewStub(channel)); // 存入stub队列}}~StubPool(){lock_guard<mutex> lock(_mutex); // 加锁_bStop = true;_cv.notify_all(); // 通知所有线程退出for (size_t i = 0; i < _stubQueue.size(); i++){_stubQueue.pop(); // 弹出所有stub}}/* 获取一个stub */unique_ptr<VerifyService::Stub> GetStub(){unique_lock<mutex> lock(_mutex); // 加锁_cv.wait(lock, [this]() {return !_stubQueue.empty(); }); // 阻塞等待,队列非空时才继续往下执行if (_bStop) // 线程池停止return nullptr;unique_ptr<VerifyService::Stub> stub = move(_stubQueue.front()); // 获取stub_stubQueue.pop(); // 弹出stubreturn stub;}/* 归还一个stub */void ReturnStub(unique_ptr<VerifyService::Stub> stub){lock_guard<mutex> lock(_mutex); // 加锁if (_bStop) // 线程池停止return;_stubQueue.push(move(stub)); // 存入stub队列_cv.notify_one(); // 通知一个线程}private:atomic<bool> _bStop; // 该线程池是否停止size_t _poolSize; // 线程池大小queue<unique_ptr<VerifyService::Stub>> _stubQueue; // 存放stub的队列mutex _mutex; // 互斥锁condition_variable _cv; // 条件变量
};
2.3 修改VerifyGrpcClient请求服务器逻辑
*.h
class VerifyGrpcClient : public Singletion<VerifyGrpcClient>
{friend class Singletion<VerifyGrpcClient>;
public:~VerifyGrpcClient();/* 向GRPC服务器请求验证码 */GetVerifyRsponse GetVerifyCode(string email);private:VerifyGrpcClient();StubPool* _stubPool = nullptr; // 存放stub的线程池
};
*.cpp
#include "VerifyGrpcClient.h"VerifyGrpcClient::VerifyGrpcClient()
{_stubPool = new StubPool(5); // 创建连接池
}VerifyGrpcClient::~VerifyGrpcClient()
{}GetVerifyRsponse VerifyGrpcClient::GetVerifyCode(string email)
{ClientContext context; // GRPC上下文GetVerifyRsponse response; // 响应对象GetVerifyRequest request; // 请求对象request.set_email(email); unique_ptr<VerifyService::Stub> stub_ = _stubPool->GetStub(); // 从连接池中获取一个连接(unique_ptr的移动拷贝构造)Status status = stub_->GetVerifyCode(&context, request, &response); // 发起GRPC请求if (status.ok()){return response;}else{response.set_error(ErrorCodes::RPC_FAILED);return response;}_stubPool->ReturnStub(move(stub_)); // 归还连接到连接池,由于stub_是左值,此时需要move一下变成右值
}
2.4 编译
没有问题
测试一下代码是否生效
正常生效,没有任何问题