当前位置: 首页 > news >正文

TCP实现线程池竞争任务

服务端:

#include<stdio.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<netinet/ip.h>
#include<strings.h>
#include<unistd.h>
#include<ctype.h>
#include<arpa/inet.h>
#include<stdlib.h>
#include<string.h>
#include<sys/wait.h>
#include<pthread.h>#define SERV_PORT 8000
#define MAXLINE 80//打印报错信息
#define prrexit(msg){ \perror(msg); \exit(1); \}typedef struct Task{int fd;struct Task  *next;
}Task;
//任务池子,队列
typedef struct Task_pool{Task *head;Task *tail;pthread_mutex_t lock;pthread_cond_t havetask;
}Task_pool;Task_pool *task_pool_init(){Task_pool *tp=(Task_pool *)malloc(sizeof(Task_pool));tp->head=NULL;tp->tail=NULL;pthread_mutex_init(&tp->lock,NULL);pthread_cond_init(&tp->havetask,NULL);return tp;
}void task_pool_push( Task_pool *tp,int fd){pthread_mutex_lock(&tp->lock);Task *t=(Task *)malloc(sizeof(Task));t->fd=fd;t->next=NULL;//两种情况if(!tp->tail){tp-> head=tp->tail=t;}else{tp->tail->next=t;tp-> tail=t;}pthread_cond_broadcast(&tp->havetask);pthread_mutex_unlock(&tp->lock);
}Task task_pool_pop(Task_pool *tp){pthread_mutex_lock(&tp->lock);//为什么不能用ifwhile(tp->head==NULL){pthread_cond_wait(&tp->havetask,&tp->lock);}Task tmp,*k;k=tp->head;tmp=*k;tp->head=tp->head->next;//队列一开始为空的情况下if(!tp->head){tp->tail=NULL;}free(k);pthread_mutex_unlock(&tp->lock);return tmp;
}void task_pool_free(Task_pool *tp){pthread_mutex_lock(&tp->lock);Task *p=tp->head,*k;while(p){k=p;p=p->next;free(k);}tp->head=NULL;pthread_mutex_unlock(&tp->lock);pthread_mutex_destroy(&tp->lock);pthread_cond_destroy(&tp->havetask);free(tp);return ;
}//子线程
void *up_server(void *arg){//获取自己的线程id,自我释放pthread_detach(pthread_self());//进行安全的类型转换//  int connfd=(int)(intptr_t)arg;char buff[MAXLINE];int n,i;Task_pool *tp=arg;while(1){ Task tmp=task_pool_pop(tp);int connfd=tmp.fd;printf("get task fd=%d\n",connfd);while(1){n=read(connfd,buff,MAXLINE);if(n<=0){perror("read error  or connections closed");break;}buff[n]='\0';//添加字符串终止符write(1,buff,n);if(strncmp(buff,"quit",4)==0){break;}for(i = 0; i < n ; i++)buff[i]=toupper(buff[i]);write(connfd,buff,n);}printf("finish task fd=%d\n",connfd);close(connfd);}//正常退出return (void *)0;
}int main(){struct sockaddr_in serveraddr,claddr;int listenfd, connfd;socklen_t  claddr_len;// char buff[MAXLINE];char str[INET_ADDRSTRLEN];int n,i;//任务池创建Task_pool *tp=task_pool_init();//多线程   pthread_t  tid;//多少核就多少个//一上来就会打印idfor(i=0;i<4;i++){pthread_create(&tid,NULL,up_server,(void *)(intptr_t)tp);printf("new thread is %#lx\n",tid);}listenfd =socket(AF_INET,SOCK_STREAM,0);if(listenfd<0){prrexit("socket");}//服务器ip地址,端口初始化bzero(&serveraddr,sizeof(serveraddr));serveraddr.sin_family=AF_INET;serveraddr.sin_port = htons(SERV_PORT);serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);if(bind(listenfd,(struct  sockaddr *)&serveraddr,sizeof(serveraddr))<0)prrexit("bind");if(listen(listenfd,3)<0)prrexit("listen");printf("Accepting connections...\n");while(1){claddr_len= sizeof(claddr);connfd=accept(listenfd,(struct sockaddr *)&claddr,&claddr_len);if(connfd<0)prrexit("accept");printf("received from %s:%d\n",inet_ntop(AF_INET,&claddr.sin_addr,str,sizeof(str)),ntohs(claddr.sin_port));/*多进程pid_t pid= fork();if(pid<0){prrexit("fork");}//父进程 :等待 创建连接if(pid > 0){close(connfd);//回收进程资源while(waitpid(-1,NULL,WNOHANG)>0){ };continue;}close(listenfd);*///多线程//  pthread_t  tid;//  pthread_create(&tid,NULL,up_server,(void *)(intptr_t)connfd);//  printf("new thread is %#lx\n",tid);task_pool_push(tp,connfd);}task_pool_free(tp);return 0;
}

客户端:

#include<stdio.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<netinet/ip.h>
#include<string.h>
#include<unistd.h>
#include<ctype.h>
#include<arpa/inet.h>
#include<stdlib.h>#define SERV_PORT 8000
#define MAXLINE 80int main(){struct  sockaddr_in servaddr;char buff[MAXLINE];int sockfd = socket(AF_INET, SOCK_STREAM,0);if(sockfd < 0){perror("socket");exit(1);}bzero(&servaddr,sizeof(servaddr));servaddr.sin_family=AF_INET;servaddr.sin_port = htons(SERV_PORT);inet_pton(AF_INET,"127.0.0.1",&servaddr.sin_addr);if(connect(sockfd,(struct sockaddr *)&servaddr,sizeof(servaddr))<0){perror("cnnect");exit(1);}printf("Connect to server .Type 'quit' to exit.\n");//死循环进行读入int n;while((n=read(0,buff,MAXLINE))>0){if(n > 0)buff[n-1] = '\0';//边界检查,只比较前四个字节if(strncmp(buff,"quit",4)==0){printf("Quitting ..\n");write(sockfd,buff,strlen(buff));break;}ssize_t bytes_written =write(sockfd,buff,strlen(buff));if(bytes_written!=strlen(buff)){perror("write error");break;}//读取云服务器响应n = read(sockfd,buff,MAXLINE);if(n<=0){if(n==0){printf("Server closed the connection.\n");}else{perror("read error");}break;}buff[n] = '\0';//确保响应字符串正确终止printf("Server  response: %s\n",buff);printf("Enter next message : ");fflush(stdout);//强制刷新输出缓冲区}  if(n<0){perror("read from stdin error");}close(sockfd);printf("Client exited.\n");return 0;
}

http://www.dtcms.com/a/360737.html

相关文章:

  • LeetCode Hot 100 Python (31~40)
  • 运动规划实战案例 | 基于行人社交模型的移动机器人动态避障(附ROS C++仿真)
  • Linux Tun/Tap 多队列技术
  • 【STM32】贪吃蛇 [阶段2](嵌入式进阶方向)
  • 【含文档+PPT+源码】基于SpringBoot+微信小程序的饮水健康之净水器保养管理系统设计与实现【包运行成功】
  • 【Linux】模拟实现Shell(下)
  • 打开模板打印
  • Ajax笔记(下)
  • 《探索C++11:现代C++语法的性能革新(上篇)》
  • 医疗AI时代的生物医学Go编程:高性能计算与精准医疗的案例分析(八)
  • Redis 核心概念解析:从渐进式遍历、数据库管理到客户端通信协议
  • 《C++进阶之STL》【红黑树】
  • C语言数据结构之双向链表
  • 基于 DNA 的原核生物与微小真核生物分类学:分子革命下的范式重构​
  • 【JavaWeb】之HTML(对HTML细节的一些总结)
  • Notepad++近期版本避雷
  • 【golang长途旅行第35站】Redis
  • Objective-C 的坚毅与传承:在Swift时代下的不可替代性优雅草卓伊凡
  • 云市场周报 (2025.09.01):解读腾讯云向量数据库、阿里云西安节点与平台工程
  • 从零开始的云计算生活——第五十五天,黑云压城,kubernetes模块之网络组件和CoreDNS组件
  • 数组(3)
  • Proteus8 仿真教学全指南:从入门到实战的电子开发利器
  • GitHub 热榜项目 - 日榜(2025-09-01)
  • 基于YOLOv11的脑卒中目标检测及其完整数据集——推动智能医疗发展的新机遇!
  • MySQL下载及安装(Windows 11)
  • 【LeetCode】3524. 求出数组的 X 值 I (动态规划)
  • 【LeetCode 155】—最小栈 - 详解与实现
  • 阿里Qoder怎么样?实测对比TRAE SOLO 和 CodeBuddy IDE
  • 保健品跨境电商:如何筑牢产品质量与安全防线?
  • 数据库事务隔离级别与 MVCC 机制详解