当前位置: 首页 > news >正文

Linux网络编程day7 线程池and UDP

线程池

typedef struct{void*(*function)(void*);        //函数指针,回调函数void*arg;                       //上面函数的参数
}threadpool_task_t;                 //各子线程任务的结构体/*描述线程池相关信息*/struct threadpool_t{pthread_mutex_t lock;           // 用于锁住本结构体pthread_mutex_t thread_counter; //记录忙状态线程个数的锁 -- bust_thr_numpthread_cond_t queue_not_full;  //当任务队列满时 , 添加任务的线程阻塞 , 等待此条件变量pthread_cond_t queue_not_empty; //任务队列不为空时,通知等待任务的线程pthread_t *threads;             //存放线程池中每个线程的Tid . 数组pthread_t adjust_tid;           //存管理线程的tidthreadpool_task_t *task_queue;  //任务队列--数组首地址int min_thr_num;                //线程池最小线程数int max_thr_num;                //线程池最大线程数int live_thr_num;               //当前存活线程个数int busy_thr_num;               //忙状态线程个数int wait_exit_thr_num;          //要销毁的线程个数int queue_front;                //task_queue队头下标int queue_rear;                 //task_queue队尾下标int queue_size;                 //task_queue队中实际任务数int queue_max_size;             //task_queue队列可容纳任务数上限int shutdown;                   //标志位,线程池使用状态,true or false
};

线程池模块分析

1、main():创建线程池

                   向线程池中添加任务,借助回调处理任务

                   销毁线程池

int main(void)
{//threadpool_t * threadpool_create(int min_thr_num , int max_thr_num , int queue_max_size);threadpool_t *thp = threadpool_create(3 , 100 , 100);//创建线程池,最大数量100,最小数量3 ,任务队列最
大容量100.printf("pool inited");int num[20] , i;  //模拟客户端向服务器发送数据等场景for(i = 0; i < 20 ; i++){num[i] = i;printf("add task %d\n" , i);//int threadpool_add(threadpool_t *pool , void*(*function)(void*arg) , void arg);threadpool_add(thp , process , (void*)&num[i]); //向线程池中添加任务}sleep(10);  //等待子线程完成任务threadpool_destroy(thp);return 0 ;
}

2、pthreadpool_create:创建线程池结构体指针

                                        初始化线程池结构体中N个成员变量

                                        创建N个任务线程

                                        创建1个管理者线程

                                        失败时 , 释放空间

threadpool_t* threadpool_create(int min_thr_num , int max_thr_num , int queue_max_size)
{int i ;struct threadpool_t *pool = NULL; // 线程池 结构体do{if((pool = (struct threadpool_t*)malloc(sizeof(struct threadpool_t))) == NULL){printf("malloc threadpool fail");break;}pool->min_thr_num = min_thr_num;pool->max_thr_num = max_thr_num;pool->busy_thr_num = 0;pool->live_thr_num = min_thr_num; //活着的线程数 初值=最小线程数pool->wait_exit_thr_num = 0;pool->queue_size = 0; //有0个产品pool->queue_max_size = queue_max_size;//最大任务队列数pool->queue_front = 0;pool->queue_rear = 0;pool->shutdown = false; // 不关闭线程池/*根据最大线程上线数 , 给工作线程数组开辟空间,清零*/pool->threads = (pthread*)malloc(sizeof(pthread_t)*max_thr_num);if(pool->threads == NULL){printf("malloc threads fail");break;}memset(pool->threads , 0 , sizeof(pthread_t)*max_thr_num);/*给任务队列开辟空间 */pool->task_queue = (threadpool_task_t*)malloc(sizeof(threadpool_task_t)*queue_max_size);if(pool->task_queue == NULL){printf("malloc task_queue fail");break;}/*初始化互斥锁、条件变量 , 使用init动态初始化 , 加上进行返回值判断*/if(pthread_mutex_init((&pool->lock) , NULL) != 0|| pthread_mutex_init(&(pool->thread_counter) , NULL) != 0|| pthread_cond_init(&(pool->queue_not_empty) , NULL) != 0|| pthread_cond_init(&(pool->queue_not_full) , NULL) != 0){printf("init the lock or cond fail");break;}/*启动min_thr_num个work thread*/for(i = 0 ; i < min_thr_num ; i++){pthread_create(&(pool->threads[i]) , NULL , threadpool_thread , (void*)pool);//pool指向当前线>程池printf("stat thread 0x%x...\n" , (unsigned int)pool->threads[i]);}pthread_create(&(pool->adjust_tid) , NULL , adjust_thread , (void*)pool);//创建管理者线程return pool;}while(0);threadpool_free(pool); // 前面代码调用失败时,释放pool空间return NULL;
}

3、threadpool_thread():进入子线程回调函数。

                                         接收参数(void*)arg

                                         加锁--》lock--》整个结构体的锁

                                         判断条件变量--》wait

/* 线程池中各个工作线程 */
void* threadpool_thread(void* threadpool)
{struct threadpool_t *pool = (struct threadpool_t*)threadpool;threadpool_task_t task;//任务队列对象while(true){/*刚创建出线程,等待任务队列里面有队列 ,否则阻塞等待任务队列李有任务后再唤醒接收任务*/pthread_mutex_lock(&(pool->lock));//queue_size = 0说明没有任务,调用wait函数阻塞在条件变量上,若有任务,跳过whilewhile((pool->queue_size == 0) && (!pool->shutdown)){printf("thread 0x%x is waiting\n" , (unsigned int)pthread_self());pthread_cond_wait(&(pool->queue_not_empty) , &(pool->lock));//清除指定数目的空闲线程,如果要结束的线程个数大于0,结束线程if(pool->wait_exit_thr_num > 0 ){pool->wait_exit_thr_num--;//如果线程池里线程个数大于最小值时可以结束当前线程if(pool->live_thr_num > pool->min_thr_num){printf("thread 0x%x is exiting\n" , (unsigned int)pthread_self());pool_live_thr_num--;pthread_mutex_unlock(&(pool->lock));pthread_exit(NULL);}}//指定true,要关闭线程池里的每个线程,自行退出-->销毁线程池if(pool->shutdown){pthread_mutex_unlock(&(pool->lock));printf("thread 0x%x is exiting\n" , (unsigned int)pthread_self());pthread_detach(pthread_self());pthread_exit(NULL); // 线程自行结束}//从任务队列获取任务,出队操作task.function = pool->task_queue[pool->queue_front].function;task.arg = pool->task_queue[pool->queue_front].arg;pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size; //出队,模拟环形pool->queue_size--;//通知可以有新的任务添加进来pthread_cond_broadcast(&(pool->queue_not_full));//任务取出后立即将线程池锁释放pthread_mutex_unlock(&(pool->lock));//执行任务printf("thread 0x%x stat working\n" , (unsigned int)pthread_self());pthread_mutex_lock(&(pool->thread_counter));    //忙状态线程数变量锁pool->busy_thr_num++;                           //忙状态线程数+1pthread_mutex_unlock(&((pool->thread_counter));(*(task.function))(task.arg);//执行回调函数//任务结束处理printf("thread 0x%x end working\n" , (unsigned int)pthread_self());pthread_mutex_lock(&(pool->thread_counter));pool->busy_thr_num--;                           //处理掉任务,忙状态线程数-1pthread_mutex_unlock(&(pool->thread_counter));}pthread_exit(NULL);
}

4、adjust_thread():进入管理者线程回调函数                                 

                                 循环10s执行一次

                                 接收参数(void*)arg

                                 加锁--》lock--》整个结构体的锁

                                 获取管理线程时需要用到的变量:live busy queue task

                                 根据既定算法,使用上述3变量判断是否应该创建、销毁线程池中的指定步长的线程。

void* adjust_thread(void* threadpool)
{int i ;struct threadpool_t *pool = (struct threadpool_t*)threadpool;while(!pool->shutdown){sleep(DEFAULT_TIME); //定时对线程池管理pthread_mutex_lock(&(pool->lock));int queue_size = pool->queue_size;int live_thr_num  = pool->live_thr_num;pthread_mutex_unlock(&(pool->lock));pthread_mutex_lock(&(pool->thread_counter));int busy_thr_num = pool->busy_thr_num;pthread_mutex_unlock(&(pool->pthread_counter));//创建新线程,任务数大于最小线程池个数,且存活线程数少于最大线程数if(queue_size >= MIN_WAIT_TASK_NUM && live_thr_num < pool->max_thr_num){pthread_mutex_lock(&(pool->lock));int add = 0;//一次增加DEFAULT_THREAD个线程for(i = 0 ; i < pool->max_thr_num && add < DEFAULT_THREAD_VARY&& pool->live_thr_num < pool_max_thr_num ; i++){pthread_create(&(pool->thread[i]) , NULL , threadpool_thread , (void*)pool);add++;pool->live_thr_num++;}pthread_mutex_unlock(&(pool->lock));}if((busy_thr_num *2) < live_thr_num && live_thr_num > pool->min_thr_num){pthread_mutex_lock(&(pool->lock));pool->wait_exit_thr_num = DEFAULT_THREAD_VARY;pthread_mutex_unlock(&(pool->lock));for(i = 0 ; i < DEFAULT_THREAD_VARY ; i++){pthread_cond_signal(&(pool->queue_not_empty));}}}return NULL;
}

5、threadpool_add:模拟产生任务 num[20]

                                  设置回调函数,处理任务sleep(1)代表处理完成

                                  初始化任务队列结构体成员 回调函数和arg

                                  利用环形队列机制实现添加任务,借助队尾指针

                                  唤醒阻塞在条件变量上的线程

//线程池中的线程,模拟处理业务
void* process(void*arg)
{printf("thread 0x%x working on task %d\n" , (unsigned int)pthread_self() , (int)arg);sleep(1);printf("task %d is end\n" , (int)arg);return NULL
}
int threadpool_add(struct threadpool_t *pool , (void*)(**function)(void*arg) , (void*)arg)
{pthread_mutex_lock(&(pool->lock));//为真 , 队列已满 , 调用wait阻塞while((pool->queue_size == pool->queue_max_size) && (!pool->shutdown)){pthread_cond_wait(&(pool->queue_not_full) , &(pool->lock));}if(pool->shutdown){pthread_cond_broadcast(&(pool->queue_not_empty));pthread_mutex_unlock(&(pool->lock));return 0 ;}//清空工作线程 调用的回调函数 的参数if(pool->task_queue[pool->queue_rear].arg != NULL){pool->task_queue[pool->queue_rear].arg = NULL;}//添加任务到任务队列pool->task_queue[pool->queue_rear].function = function;pool->task_queue[pool->queue_rear].arg = arg;pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size;//队尾指针移动,模拟环形pool->queue_size++;//向任务队列中添加一个任务//添加完任务后,队列不为空,唤醒线程池中等待处理任务的线程pthread_cond_signal(&(pool->queue_not_empty));pthread_mutex_unlock(&(pool->lock));return 0 ;
}

6、从3中wait之后执行,处理任务:获取任务处理回调函数及参数

                        利用环形队列机制实现处理任务,借助队头指针

                        唤醒阻塞在条件变量上的server

                        修改忙线程数量++

                        执行处理任务线程

                        修改忙线程数量--

7、创建和销毁线程:管理者线程根据上述三个参数判断是否创建、销毁

                                   满足创建条件pthread_create()回调任务线程函数

                                   满足销毁条件wait_exit_thr_num赋值,signal给阻塞在条件变量上的线程发送假条件满足信号,跳转至wait阻塞,阻塞线程会被假信号唤醒,使用pthread_exit。

int threadpool_destroy(threadpool_t *pool)
{int i;if(pool == NULL)return -1;pool->shutdown = true;pthread_join(pool->adjust_tid , NULL);for(i = 0 ; i < pool->live_thr_num ; i++){pthread_cond_broadcast(&(pool->queue_not_empty));}for(i = 0; i < pool->live_thr_num ; i++){pthread_join(pool->threads[i] , NULL);}threadpool_free(pool);return 0;
}
int threadpool_free(threadpool_t *pool)
{if(pool == NULL)return -1;if(pool->task_queue)free(pool->tast_queue);if(pool->threads){free(pool->threads);pthread_mutex_lock(&(pool->lock));pthread_mutex_destroy(&(pool->lock));pthread_mutex_lock(&(pool->thread_counter));pthread_mutex_destroy(&(pool->thread_counter));pthread_cond_destroy(&(pool->queue_not_full));pthread_cond_destroy(&(pool->queue_not_empty));}free(pool);pool = NULL;return 0;
}

UDP服务器

TCP通信和UDP通信的优缺点

TCP

面向连接的,可靠数据包传输。对于不稳定的网络层,采取完全弥补的通信方式,丢包重传。

优点:稳定 数据流量稳定、速度稳定、顺序

缺点:传输速度慢、效率低,资源开销大。

使用场景:数据完整要求性较高,不追求效率

                  大数据传输、文件传输。

UDP

无连接的,不可靠的数据报传递。对于不稳定的网络层,采取完全不弥补的通信方式,默认还原网络状况。

优点:传输速度快,效率高,资源开销小。

缺点:不稳定 数据流量、速度不稳定,顺序不稳定

使用场景:对时效性要求较高场合。稳定性其次。

                  游戏、视频会议、视频电话。     

----腾讯、华为、阿里 -- 应用层添加数据校验协议,弥补UDP的不足

UDP实现的C/S模型

无三次握手建立连接,故没有accept()、connect()

recv()/send()只能用于TCP通信

server

server:
lfd = socket(AF_INET , SOCK_DGRAM , 0);   SOCK_DGRAM--->报式协议
bind();
listen(); ----可有可无
while(1){  //不使用read函数recvfrom() //涵盖accept函数中的传出地址结构sendto();
}
close();

client

cfd = socket(AF_INET , SOCK_DGRAM , 0);sendto("服务器地址结构" , 地址结构大小)recvfrom()
写屏幕
close()

recvfrom函数

ssize_t recvfrom(int sockfd, void *buf, size_t len, int flags, struct sockaddr *src_addr, socklen_t *addrlen);
socket:lfd
buf:缓冲区地址
len:缓冲区大小
flags:0
src_addr:传出参数,传出对端地址结构
src_addr:传入传出返回值:成功接收数据字节数
失败-1 errno   0对端关闭

sendto函数                 

ssize_t sendto(int sockfd, const void *buf, size_t len, int flags, const struct sockaddr *dest_addr, socklen_t addrlen);
socket:套接字
buf:存储数据的缓冲区
len:数据长度
flags:0
dest_addr:传入参数,目标地址结构
src_addr:地址结构长度返回值:成功写出数据字节数
失败-1 errno   

udp server端模型

#include<stdlib.h>
#include<stdio.h>
#include<unistd.h>
#include<string.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<ctype.h>
#define SERV_PORT 9004
void sys_err(char*s)
{perror(s);exit(1);
}int main(int argc , char*argv[])
{int sockfd , i , n;char buf[BUFSIZ] , str[INET_ADDRSTRLEN];struct sockaddr_in serv_addr , clit_addr;socklen_t clitlen;bzero(&serv_addr , sizeof(serv_addr));serv_addr.sin_family = AF_INET;serv_addr.sin_port = htons(SERV_PORT);serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);sockfd  = socket(AF_INET , SOCK_DGRAM , 0);if(sockfd == -1)sys_err("socket error");bind(sockfd , (struct sockaddr*)&serv_addr , sizeof(serv_addr));printf("Accepting connections-----");while(1){clitlen = sizeof(clit_addr);n = recvfrom(sockfd , buf , strlen(buf) , 0 , (struct sockaddr*)clit_addr, &clitlen);if(n == -1)sys_err("recvfrom error");printf("received from %s at port%d\n" , inet_ntop(AF_INET , &clit_addr.sin_addr , str , sizeof(str)),ntohs(clit_addr.sin_port));for(i = 0 ; i < n ; i++)buf[i] = toupper(buf[i]);n = sendto(sockfd , buf , n , 0 , (struct sockaddr*)&clit_addr , sizeof(clit_addr));if(n == -1)sys_err("sendto error");}close(sockfd);return 0 ;
}

udp client端模型

#include<stdlib.h>
#include<stdio.h>
#include<unistd.h>
#include<string.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<ctype.h>
#define SERV_PORT 9004void sys_err(char*s)
{perror(s);exit(1);
}int main(int argc , char*argv[])
{int sockfd , i , n;char buf[BUFSIZ] ;struct sockaddr_in serv_addr;bzero(&serv_addr , sizeof(serv_addr));serv_addr.sin_family = AF_INET;serv_addr.sin_port = htons(SERV_PORT);inet_pton(AF_INET , "xx.xx.x.xxx" , &serv_addr.sin_addr.s_addr);sockfd  = socket(AF_INET , SOCK_DGRAM , 0);if(sockfd == -1)sys_err("socket error");while((fgets = (buf , BUFSIZ , stdin)) != NULL){n = sendto(sockfd , buf , strlen(buf) , 0 ,(struct sockaddr*)&serv_addr , sizeof(serv_addr));if(n == -1)sys_err("sendto error");n = recvfrom(sockfd , buf , BUFSIZ , 0 , NULL , 0);if(n == -1)sys_err("recvfrom error"); write(STDOUT_FILENO , buf , n); }close(sockfd);return 0 ;
}

相关文章:

  • 只出现一次的数字(暴力、哈希查重、异或运算)
  • 交流中的收获-250508
  • 云手机虚拟地址技术的运营场景
  • FreeRTOS如何检测内存泄漏
  • ConcurrentHashMap解析
  • Java高频面试之并发编程-14
  • 设备管理系统深度测评:如何用 AI 知识库实现故障智能诊断?
  • 沃伦森电容器支路阻抗特性监控系统 电容器组智能健康管理专家
  • 模拟内存管理
  • 工程管理系统简介 工程管理系统源码 java工程管理系统 工程管理系统功能设计 从架构到实操
  • 若依框架Ruoyi-vue整合图表Echarts中国地图标注动态数据
  • TCP/IP和OSI对比
  • 果汁厂通信革新利器:Ethernet/IP转CANopen协议网关
  • 网盘解析工具更新,支持UC网盘!!
  • 从艾米・阿尔文看 CTO 的多面特质与成长路径
  • 使用DEEPSEEK快速修改QT创建的GUI
  • LLM 论文精读(三)Demystifying Long Chain-of-Thought Reasoning in LLMs
  • 深度学习笔记41_调用Gensim库训练Word2Vec模型
  • 什么是:Word2Vec + 余弦相似度
  • 又双叒叕想盘一下systemd
  • 见微知沪|优化营商环境,上海为何要当“细节控”自我加压?
  • 【社论】以法治力量促进民企长远健康发展
  • 国防部:正告菲方停止以任何方式冲撞中方核心利益
  • 深入贯彻中央八项规定精神学习教育中央第一指导组指导督导河北省见面会召开
  • 美联储宣布维持基准利率不变
  • 长安汽车辟谣作为二级企业并入东风集团:将追究相关方责任