当前位置: 首页 > news >正文

Linux C/C++ 学习日记(46):io_uring(二):reactor 与proactor的性能测试对比

注:该文用于个人学习记录和知识交流,如有不足,欢迎指点。

一、proreactor 与 reactor的区别

reactor:

  • poll + io
  • EPOLLIN:数据可以读
  • 一个事件对应一个动作

proactor:

  • io_uring
  • EVENT_READ:数据已经读出来了
  • 一个事件对应一个结果

二、测试维度:

  1. 建链时间(建立100万个连接的耗时)
  2. qps:每秒处理的响应数:(测试数据大小依次为128,512,1k,2k的回显响应)
  3. io并发 (能否建立100万个连接)

三、测试结果

这里qps,其他的有时间再测了

1. reactor的测试:epoll+io

128

512

1024

2048

2. proactor的测试:

128

512

1024

2048

四、测试代码:

1. epoll_tcp_server.c (reactor模式)

#include <sys/socket.h>
#include <errno.h>
#include <netinet/in.h>#include <stdio.h>
#include <string.h>
#include <unistd.h>#include <pthread.h>
#include <sys/poll.h>
#include <sys/epoll.h>
#include <sys/time.h>#define BUFFER_LENGTH	1024
#define CONNECTIONS  1000//1048576
#define PORT 8000
#define PORT_COUNT 1 // 100typedef int (*RCALLBACK)(int fd);int accept_cb(int fd);
int recv_cb(int fd);
int send_cb(int fd);struct conn_item {int fd;char rbuffer[BUFFER_LENGTH];int rlen;char wbuffer[BUFFER_LENGTH];int wlen;union {RCALLBACK accept_callback;RCALLBACK recv_callback;} recv_t;RCALLBACK send_callback;
};int epfd = 0;
struct conn_item connlist[CONNECTIONS] = {0}; struct timeval zvoice_king;#define TIME_SUB_MS(tv1, tv2)  ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)int set_event(int fd, int event, int flag) {if (flag) { struct epoll_event ev;ev.events = event ;ev.data.fd = fd;epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);} else {struct epoll_event ev;ev.events = event;ev.data.fd = fd;epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);}}int accept_cb(int fd) {struct sockaddr_in clientaddr;socklen_t len = sizeof(clientaddr);int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);if (clientfd < 0) {return -1;}set_event(clientfd, EPOLLIN, 1);connlist[clientfd].fd = clientfd;memset(connlist[clientfd].rbuffer, 0, BUFFER_LENGTH);connlist[clientfd].rlen = 0;memset(connlist[clientfd].wbuffer, 0, BUFFER_LENGTH);connlist[clientfd].wlen = 0;connlist[clientfd].recv_t.recv_callback = recv_cb;connlist[clientfd].send_callback = send_cb;if ((clientfd % 1000) == 999) {struct timeval tv_cur;gettimeofday(&tv_cur, NULL);int time_used = TIME_SUB_MS(tv_cur, zvoice_king);memcpy(&zvoice_king, &tv_cur, sizeof(struct timeval));printf("clientfd : %d, time_used: %d\n", clientfd, time_used);}return clientfd;
}int recv_cb(int fd) { char *buffer = connlist[fd].rbuffer;int idx = connlist[fd].rlen;int count = recv(fd, buffer+idx, BUFFER_LENGTH-idx, 0);if (count == 0) {//printf("disconnect\n");epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);		close(fd);connlist[fd].fd = 0;return -1;}connlist[fd].rlen += count;memcpy(connlist[fd].wbuffer, connlist[fd].rbuffer, connlist[fd].rlen);connlist[fd].wlen = connlist[fd].rlen;connlist[fd].rlen -= connlist[fd].rlen;set_event(fd, EPOLLOUT, 0);return count;
}int send_cb(int fd) {char *buffer = connlist[fd].wbuffer;int idx = connlist[fd].wlen;int count = send(fd, buffer, idx, 0);set_event(fd, EPOLLIN, 0);return count;
}int init_server(unsigned short port) {int sockfd = socket(AF_INET, SOCK_STREAM, 0);struct sockaddr_in serveraddr;memset(&serveraddr, 0, sizeof(struct sockaddr_in));serveraddr.sin_family = AF_INET;serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);serveraddr.sin_port = htons(port);if (-1 == bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(struct sockaddr))) {perror("bind");return -1;}listen(sockfd, 10);return sockfd;
}int close_server()
{for (int i = 0; i < PORT_COUNT + 100; i++){if (connlist[i].fd != 0){close(connlist[i].fd);}}
}// tcp 
int main() {int i = 0;epfd = epoll_create(1); // int sizefor (i = 0;i < PORT_COUNT;i ++) {int sockfd = init_server(PORT + i);connlist[sockfd].fd = sockfd;connlist[sockfd].recv_t.accept_callback = accept_cb;set_event(sockfd, EPOLLIN, 1);}gettimeofday(&zvoice_king, NULL);struct epoll_event events[CONNECTIONS] = {0};while (1) { int nready = epoll_wait(epfd, events, CONNECTIONS, -1); // int i = 0;for (i = 0;i < nready;i ++) {int connfd = events[i].data.fd;if (events[i].events & EPOLLIN) { //int count = connlist[connfd].recv_t.recv_callback(connfd);//printf("recv count: %d <-- buffer: %s\n", count, connlist[connfd].rbuffer);} else if (events[i].events & EPOLLOUT) { //printf("send --> buffer: %s\n",  connlist[connfd].wbuffer);int count = connlist[connfd].send_callback(connfd);}}}close_server();}

2. uring_tcp_server

#include <stdio.h>          // 标准输入输出(打印日志、错误信息)
#include <liburing.h>       // io_uring库(Linux异步IO框架,替代epoll实现高效IO)
#include <netinet/in.h>     // 网络地址结构(sockaddr_in等IPv4相关定义)
#include <string.h>         // 内存操作(memset、memcpy等)
#include <unistd.h>         // 系统调用(close等)
#include <stdlib.h>         // 动态内存管理(malloc、free等)//编译:gcc -o uring_tcp_server uring_tcp_server.c -luring // 事件类型宏定义:用于标识io_uring中待处理的事件类型
#define EVENT_ACCEPT 0  // 接受客户端连接事件(监听socket专用)
#define EVENT_READ 1    // 从客户端读取数据事件(客户端连接专用)
#define EVENT_WRITE 2   // 向客户端写入数据事件(客户端连接专用)
#define PORT 8000
#define BUFFER_LENGTH 1024/*** 连接上下文结构体:每个客户端连接的专属数据载体* 作用:绑定连接的文件描述符、缓冲区及事件状态,避免多连接数据冲突*/
struct conn_ctx
{int fd;             // 连接的socket文件描述符(监听socket/客户端socket)int event;          // 当前等待的事件类型(对应EVENT_*宏)char rbuffer[BUFFER_LENGTH]; // 专属读缓冲区:存储从客户端读取的数据(1024字节)char wbuffer[BUFFER_LENGTH]; // 专属写缓冲区:存储待发送给客户端的数据(1024字节)ssize_t rlen;       // 实际读取的字节数(记录rbuffer中有效数据长度)ssize_t wlen;       // 待发送的字节数(记录wbuffer中有效数据长度)
};/*** 初始化TCP服务器监听socket* @param port 服务器监听的端口号* @return 成功返回监听socket的文件描述符,失败返回-1*/
int init_server(unsigned short port)
{// 1. 创建TCP socket(IPv4协议族,字节流套接字)// AF_INET:IPv4地址族;SOCK_STREAM:TCP协议(面向连接);0:默认协议(TCP)int sockfd = socket(AF_INET, SOCK_STREAM, 0);if (sockfd == -1){perror("socket创建失败");  // 打印错误原因(如权限不足、系统资源耗尽)return -1;}// 2. 初始化服务器地址结构struct sockaddr_in serveraddr;memset(&serveraddr, 0, sizeof(struct sockaddr_in));  // 清空结构体(避免随机值干扰)serveraddr.sin_family = AF_INET;                     // 使用IPv4协议serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);      // 监听所有本地网卡(0.0.0.0)serveraddr.sin_port = htons(port);                   // 端口号转换为网络字节序(大端)// 3. 绑定socket到指定端口(将socket与服务器地址关联)if (-1 == bind(sockfd, (struct sockaddr *)&serveraddr, sizeof(struct sockaddr))){perror("bind绑定端口失败");  // 错误可能:端口被占用、无权限使用特权端口close(sockfd);               // 绑定失败需关闭socket释放资源return -1;}// 4. 开始监听连接(将主动socket转为被动socket,准备接受客户端连接)// 第二个参数1024:未完成连接队列(三次握手未完成)的最大长度(高并发场景需调大)listen(sockfd, 1024);return sockfd;  // 返回监听socket的文件描述符
}// io_uring提交队列(SQ)的最大长度:最多同时等待处理1024个IO事件
#define ENTRIES_LENGTH 1024/*** 向io_uring注册"异步读"事件* 作用:告知内核"当该socket有数据可读时,读取数据到专属缓冲区"* @param ring io_uring实例(管理提交队列和完成队列)* @param ctx 连接上下文(包含socket fd、读缓冲区等)* @return 0成功,-1失败*/
int set_event_recv(struct io_uring *ring, struct conn_ctx *ctx)
{// 从io_uring的提交队列(SQ)中获取一个空闲的队列项(SQE)// SQE(Submission Queue Entry):用户态描述待执行的IO操作(此处为读操作)struct io_uring_sqe *sqe = io_uring_get_sqe(ring);if (!sqe)  // 若提交队列已满,获取失败{perror("获取SQE失败");return -1;}// 填充SQE:设置异步读操作(底层对应recv系统调用)// 参数:sqe队列项、目标socket fd、读缓冲区(专属rbuffer)、最大读取长度、 flags(0为默认)io_uring_prep_recv(sqe, ctx->fd, ctx->rbuffer, sizeof(ctx->rbuffer), 0);// 将连接上下文指针存入SQE的user_data:事件完成时通过该字段定位到对应的连接sqe->user_data = (unsigned long)ctx; // user_data是个64位的无符号整数,我们这里做个强转return 0;
}/*** 向io_uring注册"异步写"事件* 作用:告知内核"当该socket可写时,将专属缓冲区的数据发送给客户端"* @param ring io_uring实例* @param ctx 连接上下文(包含socket fd、写缓冲区及待发送长度)* @return 0成功,-1失败*/
int set_event_send(struct io_uring *ring, struct conn_ctx *ctx)
{// 获取一个空闲的SQE(提交队列项)struct io_uring_sqe *sqe = io_uring_get_sqe(ring);if (!sqe){perror("获取SQE失败");return -1;}// 填充SQE:设置异步写操作(底层对应send系统调用)// 参数:sqe队列项、目标socket fd、写缓冲区(专属wbuffer)、待发送长度(rlen记录的读取长度)io_uring_prep_send(sqe, ctx->fd, ctx->wbuffer, ctx->rlen, 0);// 存储连接上下文指针:事件完成时定位到对应的连接sqe->user_data = (unsigned long)ctx;return 0;
}/*** 向io_uring注册"异步接受连接"事件* 作用:告知内核"当有新客户端连接时,接受连接并返回新的socket fd"* @param ring io_uring实例* @param sockfd 监听socket的文件描述符* @return 0成功,-1失败*/
int set_event_accept(struct io_uring *ring, int sockfd)
{// 获取一个空闲的SQE(提交队列项)struct io_uring_sqe *sqe = io_uring_get_sqe(ring);if (!sqe){perror("获取SQE失败");return -1;}// 为接受连接操作分配临时上下文(仅用于存储监听socket信息)struct conn_ctx *listen_ctx = malloc(sizeof(struct conn_ctx));if (!listen_ctx){perror("malloc listen_ctx失败");return -1;}listen_ctx->fd = sockfd;       // 绑定监听socket fdlisten_ctx->event = EVENT_ACCEPT;  // 标记为接受连接事件// 分配客户端地址结构(用于存储新连接的客户端IP和端口)struct sockaddr_in *clientaddr = malloc(sizeof(struct sockaddr_in));socklen_t *len = malloc(sizeof(socklen_t));  // 地址长度变量*len = sizeof(struct sockaddr_in);  // 初始化长度为IPv4地址结构大小// 填充SQE:设置异步接受连接操作(底层对应accept系统调用)// 参数:sqe队列项、监听socket fd、客户端地址指针、地址长度指针、flags(0为默认)io_uring_prep_accept(sqe, sockfd, (struct sockaddr *)clientaddr, len, 0);// 存储临时上下文指针:事件完成时通过该字段获取监听信息sqe->user_data = (unsigned long)listen_ctx;// 临时存储客户端地址和长度的指针(避免使用全局变量,保证线程安全)// 利用listen_ctx的缓冲区暂存指针,后续从这里恢复memcpy(listen_ctx->wbuffer, &clientaddr, sizeof(void *));memcpy(listen_ctx->rbuffer, &len, sizeof(void *));return 0;
}int main(int argc, char *argv[])
{// 初始化服务器:创建并返回监听socketint sockfd = init_server(PORT);if (sockfd == -1)  // 初始化失败则退出{return -1;}printf("服务器启动,监听端口 %d\n", PORT);// 初始化io_uringstruct io_uring_params params;  // io_uring初始化参数(使用默认配置)memset(&params, 0, sizeof(params));  // 清空参数struct io_uring ring;  // io_uring实例(管理提交队列SQ和完成队列CQ)// 初始化io_uring:创建容量为ENTRIES_LENGTH的队列// 内部通过io_uring_setup系统调用创建内核队列,并通过mmap映射到用户态(减少数据拷贝)if (io_uring_queue_init_params(ENTRIES_LENGTH, &ring, &params) != 0){perror("io_uring初始化失败");close(sockfd);  // 失败需释放监听socketreturn -1;}// 注册初始事件:向io_uring提交"接受连接"事件(开始监听新连接)set_event_accept(&ring, sockfd);// 主循环:持续处理io_uring中的事件(接受连接、读数据、写数据)while (1){// 1. 提交所有已准备好的SQE(提交队列中的事件)到内核// 内核会异步处理这些IO操作,完成后将结果放入完成队列CQ(无需用户主动轮询)io_uring_submit(&ring);// 2. 批量获取完成队列(CQ)中的事件(最多128个)// 相比单次获取,批量处理减少用户态与内核态交互,提升效率struct io_uring_cqe *cqes[128];  // 存储完成的事件// 从完成队列中获取事件,返回实际获取的数量(nready)int nready = io_uring_peek_batch_cqe(&ring, cqes, 128);if (nready < 0)  // 获取失败(如系统调用错误){perror("io_uring获取完成事件失败");break;}// 3. 遍历处理每个完成的事件for (int i = 0; i < nready; i++){struct io_uring_cqe *cqe = cqes[i];  // 当前完成的事件// 从事件的user_data中恢复连接上下文(之前注册时存入的指针)struct conn_ctx *ctx = (struct conn_ctx *)(cqe->user_data);// 处理"接受连接"事件(EVENT_ACCEPT)if (ctx->event == EVENT_ACCEPT){// 恢复客户端地址和长度的指针(从临时上下文的缓冲区中)struct sockaddr_in **clientaddr = (struct sockaddr_in **)ctx->wbuffer;socklen_t **len = (socklen_t **)ctx->rbuffer;// 重新注册"接受连接"事件(一次accept只能处理一个连接,需持续监听新连接)set_event_accept(&ring, sockfd);// 从完成事件中获取新连接的socket fd(cqe->res是accept的返回值)int connfd = cqe->res;if (connfd == -1)  // 接受连接失败(如队列满){perror("接受连接失败");}else  // 接受连接成功{// 为新连接分配专属上下文(含独立缓冲区,避免多连接数据冲突)struct conn_ctx *new_ctx = malloc(sizeof(struct conn_ctx));if (!new_ctx)  // 内存分配失败{perror("malloc new_ctx失败");close(connfd);  // 关闭新连接,避免资源泄漏}else  // 初始化新连接上下文{memset(new_ctx, 0, sizeof(struct conn_ctx));  // 清空缓冲区new_ctx->fd = connfd;                         // 绑定新连接的fdnew_ctx->event = EVENT_READ;                  // 初始事件为"读"(等待客户端发数据)// 注册读事件:告知内核"当该连接有数据时,读取到专属rbuffer"set_event_recv(&ring, new_ctx);}}// 释放临时资源(监听上下文、客户端地址和长度)free(*clientaddr);  // 释放客户端地址结构free(*len);         // 释放地址长度变量free(ctx);          // 释放监听上下文}// 处理"读数据"事件(EVENT_READ):客户端发送数据后触发else if (ctx->event == EVENT_READ){ssize_t ret = cqe->res;  // ret是recv的返回值:>0为读取字节数;0为客户端关闭;-1为错误if (ret <= 0)  // 客户端关闭连接或读取失败{close(ctx->fd);  // 关闭连接socketfree(ctx);       // 释放连接上下文(避免内存泄漏)}else  // 成功读取到数据(ret为实际读取的字节数){ctx->rlen = ret;  // 记录读取长度(用于后续回显)// 将读缓冲区的数据复制到写缓冲区(准备回显,使用专属缓冲区避免冲突)memcpy(ctx->wbuffer, ctx->rbuffer, ret);// 切换事件类型为"写",并注册写事件(告知内核"可写时发送数据")ctx->event = EVENT_WRITE;set_event_send(&ring, ctx);}}// 处理"写数据"事件(EVENT_WRITE):数据发送完成后触发else if (ctx->event == EVENT_WRITE){ssize_t ret = cqe->res;  // ret是send的返回值:>0为发送字节数;<=0为失败if (ret > 0)  // 数据发送成功{// 切换事件类型为"读",重新注册读事件(等待客户端下一次发送数据)ctx->event = EVENT_READ;set_event_recv(&ring, ctx);}else  // 发送失败(如客户端已关闭){close(ctx->fd);  // 关闭连接socketfree(ctx);       // 释放连接上下文}}// 标记事件已处理:告知内核该CQE可被复用(推进完成队列指针)io_uring_cqe_seen(&ring, cqe);}}// 资源清理(理论上主循环不会退出,实际应用中需在信号处理中调用)io_uring_queue_exit(&ring);  // 销毁io_uring实例(释放内核资源)close(sockfd);               // 关闭监听socketreturn 0;
}

3. test_qps_tcpclient.c

#include <stdio.h>
#include <string.h>
#include <sys/socket.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/time.h>
#include <pthread.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <sys/errno.h>#define RBUFFER_LENGTH 128 // 512 1024 2048
#define WBUFFER_LENGTH 128// ./test_qps_tcpclient -s 192.168.248.130 -p 8000 -t 50 -c 100 -n 10000// 宏定义:1=非阻塞模式(有超时、休眠时间,测得不准确,好处是不会卡死线程),0=阻塞模式(测得准确,阻塞时CPU会切换到别的线程,但是可能会一直阻塞导致该线程卡死,结果一直出不来)
// 建议使用阻塞模式,测得更准确,遇到线程卡死的情况再选择非阻塞
#define USE_NONBLOCK 0typedef struct test_context_s
{char serverip[16];int port;int threadnum;int connection;int requestion;char buffer[RBUFFER_LENGTH];int conns_per_thread;int failed;pthread_mutex_t mutex; // 线程安全锁
} test_context_t;// 设置非阻塞模式(仅非阻塞模式使用)
int SetNoblock(int fd)
{int flag = fcntl(fd, F_GETFL);return fcntl(fd, F_SETFL, flag | O_NONBLOCK) == 0;
}// 计算时间差(毫秒)
#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)// 连接服务器(阻塞/非阻塞模式通用)
int connect_tcpserver(const char *ip, unsigned short port, int timeout_ms)
{int connfd = socket(AF_INET, SOCK_STREAM, 0);if (connfd < 0){perror("socket");return -1;}struct sockaddr_in tcpserver_addr;memset(&tcpserver_addr, 0, sizeof(struct sockaddr_in));tcpserver_addr.sin_family = AF_INET;tcpserver_addr.sin_addr.s_addr = inet_addr(ip);tcpserver_addr.sin_port = htons(port);// 阻塞模式:直接阻塞连接int ret = connect(connfd, (struct sockaddr *)&tcpserver_addr, sizeof(struct sockaddr_in));if (ret != 0){perror("connect (block)");close(connfd);return -1;}
#if USE_NONBLOCKSetNoblock(connfd);
#endifreturn connfd;
}// 发送函数(阻塞/非阻塞模式通用)
int Send(int fd, char *wbuffer, int len, int timeout_ms)
{
#if USE_NONBLOCK// 非阻塞模式:带超时循环发送struct timeval start, now;gettimeofday(&start, NULL);size_t total = 0;while (total < len){gettimeofday(&now, NULL);int elapsed = TIME_SUB_MS(now, start);if (elapsed >= timeout_ms){printf("Send timeout (elapsed: %d ms)\n", elapsed);return -1;}ssize_t sent = send(fd, wbuffer + total, len - total, 0);if (sent < 0){if (errno == EAGAIN || errno == EWOULDBLOCK){usleep(1); // 短暂休眠,减少CPU占用continue;}else{printf("send errno: %d --> %s\n", errno, strerror(errno));return -1;}}if (sent == 0){return -1; // 连接关闭}total += sent;}
#else// 阻塞模式:循环发送直到完成(无超时,依赖系统默认超时)size_t total = 0;while (total < len){ssize_t sent = send(fd, wbuffer + total, len - total, 0);if (sent <= 0){printf("send errno: %d --> %s\n", errno, strerror(errno));return -1;}total += sent;}
#endifreturn 0;
}// 接收函数(阻塞/非阻塞模式通用)
int Recv(int fd, char *rbuffer, int len, int timeout_ms)
{
#if USE_NONBLOCK// 非阻塞模式:带超时循环接收struct timeval start, now;gettimeofday(&start, NULL);size_t total = 0;while (total < len){gettimeofday(&now, NULL);int elapsed = TIME_SUB_MS(now, start);if (elapsed >= timeout_ms){printf("Recv timeout (elapsed: %d ms)\n", elapsed);return -1;}ssize_t ret = recv(fd, rbuffer + total, len - total, 0);if (ret < 0){if (errno == EAGAIN || errno == EWOULDBLOCK){usleep(1); // 短暂休眠,减少CPU占用continue;}else{printf("recv errno: %d --> %s\n", errno, strerror(errno));return -1;}}if (ret == 0){return -1; // 连接关闭}total += ret;}
#else// 阻塞模式:循环接收直到完成(无超时,依赖系统默认超时)size_t total = 0;while (total < len){ssize_t ret = recv(fd, rbuffer + total, len - total, 0);if (ret <= 0){printf("recv errno: %d --> %s\n", errno, strerror(errno));return -1;}total += ret;}
#endifreturn 0;
}// 发送接收并校验
int send_recv_tcppkt(int fd, test_context_t *ctx)
{// 发送超时:5秒,接收超时:5秒(可根据需求调整)int res = Send(fd, ctx->buffer, WBUFFER_LENGTH, 5000);if (res < 0){return -1;}char rbuffer[WBUFFER_LENGTH] = {0};res = Recv(fd, rbuffer, WBUFFER_LENGTH, 5000);if (res < 0){return -1;}if (memcmp(rbuffer, ctx->buffer, WBUFFER_LENGTH) != 0){printf("data mismatch\n");return -1;}return 0;
}// 线程入口:按连接数分摊请求
static void *test_qps_entry(void *arg)
{test_context_t *pctx = (test_context_t *)arg;// 计算当前线程需创建的连接数int conns_per_thread = pctx->conns_per_thread;// 计算每个连接需处理的请求数int reqs_per_conn = pctx->requestion / pctx->connection;int remainder_reqs = pctx->requestion % pctx->connection;for (int conn_idx = 0; conn_idx < conns_per_thread; conn_idx++){// 连接超时:3秒(非阻塞模式生效,阻塞模式依赖系统默认)int connfd = connect_tcpserver(pctx->serverip, pctx->port, 3000);if (connfd < 0){// 连接失败,标记该连接的所有请求为失败int reqs = reqs_per_conn;if (conn_idx < remainder_reqs){reqs += 1;}pthread_mutex_lock(&pctx->mutex);pctx->failed += reqs;pthread_mutex_unlock(&pctx->mutex);continue;}// 处理该连接的所有请求int reqs = reqs_per_conn;if (conn_idx < remainder_reqs){reqs += 1;}for (int i = 0; i < reqs; i++){int res = send_recv_tcppkt(connfd, pctx);if (res != 0){pthread_mutex_lock(&pctx->mutex);pctx->failed++;pthread_mutex_unlock(&pctx->mutex);}}close(connfd);}return NULL;
}int main(int argc, char *argv[])
{int ret = 0;test_context_t ctx = {0};pthread_mutex_init(&ctx.mutex, NULL); // 初始化互斥锁int opt;while ((opt = getopt(argc, argv, "s:p:t:c:n:?")) != -1){switch (opt){case 's': // ipprintf("-s: %s\n", optarg);strcpy(ctx.serverip, optarg);break;case 'p': // portprintf("-p: %s\n", optarg);ctx.port = atoi(optarg);break;case 't': // 线程数printf("-t: %s\n", optarg);ctx.threadnum = atoi(optarg);break;case 'c': // 连接数printf("-c: %s\n", optarg);ctx.connection = atoi(optarg);break;case 'n': // 请求数printf("-n: %s\n", optarg);ctx.requestion = atoi(optarg);break;default:return -1;}}// 初始化发送缓冲区(二进制数据)for (int i = 0; i < WBUFFER_LENGTH; i++){ctx.buffer[i] = 'D';}ctx.buffer[WBUFFER_LENGTH - 1] = '\0'; // 保持与服务器回显一致pthread_t *ptid = malloc(ctx.threadnum * sizeof(pthread_t));if (!ptid){perror("malloc ptid failed");ret = -1;goto clean;}int remainder_conns = ctx.connection % ctx.threadnum;if (remainder_conns == 0){ctx.conns_per_thread = ctx.connection / ctx.threadnum;printf("开始执行:总连接数 %d,线程数 %d → 每个线程基础创建 %d 个连接,回显的数据大小:%d bytes\n",ctx.connection, ctx.threadnum, ctx.conns_per_thread, WBUFFER_LENGTH);}else{printf("无法均匀分配连接数,这里增加 %d 个连接\n", ctx.threadnum - remainder_conns);ctx.connection += ctx.threadnum - remainder_conns;ctx.conns_per_thread = ctx.connection / ctx.threadnum;printf("开始执行:总连接数 %d,线程数 %d → 每个线程基础创建 %d 个连接,回显的数据大小:%d bytes\n",ctx.connection, ctx.threadnum, ctx.conns_per_thread, WBUFFER_LENGTH);}struct timeval tv_begin;gettimeofday(&tv_begin, NULL);for (int i = 0; i < ctx.threadnum; i++){if (pthread_create(&ptid[i], NULL, test_qps_entry, &ctx) != 0){perror("pthread_create failed");ret = -1;for (int j = 0; j < i; j++){pthread_join(ptid[j], NULL);}goto clean;}}for (int i = 0; i < ctx.threadnum; i++){pthread_join(ptid[i], NULL);}struct timeval tv_end;gettimeofday(&tv_end, NULL);int time_used = TIME_SUB_MS(tv_end, tv_begin);printf("模式: %s, 请求数:total: %d, success: %d, failed: %d, time_used: %d ms, qps: %d\n",USE_NONBLOCK ? "非阻塞" : "阻塞",ctx.requestion,ctx.requestion - ctx.failed, ctx.failed, time_used,(time_used > 0) ? ((ctx.requestion - ctx.failed) * 1000 / time_used) : 0);clean:free(ptid);pthread_mutex_destroy(&ctx.mutex);return ret;
}

http://www.dtcms.com/a/561976.html

相关文章:

  • 如何在 Linux 中创建自签名 SSL 证书 ?
  • 安阳做网站公司亚马逊关联乱码店铺怎么处理
  • 网站设计公司发展网站美工培训课程
  • 响应式网站是个坑优秀网页设计网址
  • 如何做聊天网站变更icp备案网站信息查询
  • 机器学习20:自编码器(Auto-Encoder)与扩散模型综述学习
  • Spring Boot解决循环依赖的几种办法
  • 成品网站免费网站下载外贸网站建设 广州
  • 网站开发费用结算wordpress 开发搜索框
  • 网站视觉规范不需要备案如何做网站
  • langchain基础
  • vue2+vuex登录功能
  • fastapi -- 耗时任务处理
  • 网站建设咨询服务企业级服务器配置
  • 做阅读任务挣钱的网站北京网站建设公司哪家最好
  • 零基础学Python_自动补全符号
  • C++14 新特性:更加简洁和高效的编程体验
  • 邹城网站设计百中搜
  • 青海省住房和城乡建设厅官方网站wordpress s.w.org
  • Apollo Planning 模块技术深度解析
  • 哪个网站可以帮助做数学题百度推送
  • 企业网站和信息化建设哪里有网站制作服务
  • 【Linux】深入理解进程(三)(环境变量)
  • 【C学生序号姓名学号年龄降序排序】2022-12-9
  • 平衡二叉树解题思路
  • 电子商务网站应该如何建设四川教育公共信息服务平台
  • 响应式官方网站便宜自适应网站建设厂家
  • 实例016 百元买百鸡问题
  • 硬件-射频学习DAY3——高频电流的“恐深症”:趋肤效应解密
  • Hudi安装部署