C++负载均衡远程调用学习之TCP连接封装与TCPCLIENT封装
目录
1.LARSV0.3回顾
2.解决粘包问题的message结构体定义
3.LARSV0.4链接对象的方法和属性的定义
4.LARSv0.4 TCP_conn链接的初始化
5.LARV0.4-tcp_conn处理读事件方法do_read
6.LARV0.4-tcp_conn模块回顾
7.LARV0.4-tcp_send_message主动发包实现
8.LARV0.4-tcp_conn处理写时间方法do_write
9.LARV0.4-tcp_server创建tcp_conn对象
10.LARV0.4-tcp_conn连接封装流程总结
11.LARV0.4-tcp_client属性和方法分析
12.LARV0.4-tcp_client非阻塞客户端套接字创建
13.LARV0.4-tcp_client读写方法实现
14.LARV0.4-tcp_client模块客户端实现问题
15.LARV0.4-tcp_client_send_message无效问题解决
16.LARV0.3-event_loop添加时间buf修复
17.LARV0.4复习
1.LARSV0.3回顾

2.解决粘包问题的message结构体定义
接下来是客户端,客户端我们创建一个Qps结构体,来记录每秒,服务端成功回显数据的次数,来做qps统计,客户端我们可以指定开多少个线程去压测服务端。
 
 > example/qps_test/client.cpp
 
 ```c
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
 #include <time.h>
 #include <string>
 
 #include "tcp_client.h"
 #include "echoMessage.pb.h"
 
 struct Qps
 {
     Qps() {
         last_time = time(NULL); 
         succ_cnt = 0;
     }
 
     long last_time;//最后一次发包时间 ms为单位
     int succ_cnt; //成功收到服务器回显的次数
 };
 
 
 //客户端业务
 void busi(const char *data, uint32_t len, int msgid, net_connection  *conn, void *user_data)
 {
     Qps *qps = (Qps*)user_data; //用户参数
 
     qps_test::EchoMessage request, response;
 
     //解析服务端传来的pb数据
     if (response.ParseFromArray(data, len) == false) {
         printf("server call back data error\n");
         return;
     }
 
     //判断数据内容是否回显一致
     if (response.content() == "Hello Lars!!!") {
         //服务器请求响应成功一次
         qps->succ_cnt ++;
     }
 
     long current_time = time(NULL);
     if (current_time - qps->last_time >= 1) {
         //如果当前时间比最后记录时间大于1秒,那么我们进行记录
         printf("---> qps = %d <----\n", qps->succ_cnt);
         qps->last_time = current_time;//记录最后时间
         qps->succ_cnt = 0;//清空成功次数
     }
 
     //给服务端发送新的请求
     request.set_id(response.id() + 1);
     request.set_content(response.content());
 
     std::string requestString;
     request.SerializeToString(&requestString);
 
     conn->send_message(requestString.c_str(), requestString.size(), msgid);
     
 }
3.LARSV0.4链接对象的方法和属性的定义
//创建链接成功之后
 void connection_start(net_connection *client, void *args)
 {
     qps_test::EchoMessage request;
 
     request.set_id(1);
     request.set_content("Hello Lars!!!");
 
     std::string requestString;
 
     request.SerializeToString(&requestString);
 
     int msgid = 1;//与server端的消息路由一致
     client->send_message(requestString.c_str(), requestString.size(), msgid);
 }
 
 
 void *thread_main(void *args)
 {
     //给服务端发包
      
     event_loop loop; 
 
     tcp_client client(&loop, "127.0.0.1", 7777, "qps client");
 
     Qps qps;
 
     //设置回调
     client.add_msg_router(1, busi, (void*)&qps);
 
     //设置链接创建成功之后Hook
     client.set_conn_start(connection_start);
 
     loop.event_process();
 
     return NULL;
 }
 
 
 int main(int argc, char **argv) 
 {
     if (argc == 1) {
         printf("Usage: ./client [threadNum]\n");
         return 1;
     }
 
     //创建N个线程
     int thread_num = atoi(argv[1]);
     pthread_t *tids;
     tids = new pthread_t[thread_num];
 
     for (int i = 0; i < thread_num; i++) {
         pthread_create(&tids[i], NULL, thread_main, NULL);
     }
 
     for (int i = 0; i < thread_num; i++) {
         pthread_join(tids[i], NULL);
     }
 
     return 0;
 }
 ```
4.LARSv0.4 TCP_conn链接的初始化
接下来我们的Makefile
 
 ```makefile
 CXX=g++
 CFLAGS=-g -O2 -Wall -fPIC -Wno-deprecated 
 
 INC=-I../../include
 LIB=-L../../lib -llreactor -lpthread -lprotobuf
 OBJS = $(addsuffix .o, $(basename $(wildcard *.cc)))
 
 all:
         $(CXX) -o server $(CFLAGS)  server.cpp echoMessage.pb.cc $(INC) $(LIB)
         $(CXX) -o client $(CFLAGS)  client.cpp echoMessage.pb.cc $(INC) $(LIB)
 
 clean:
         -rm -f *.o server client    
 ```
 
         记住编译加上`-lprotobuf` 编译的文件加上`echoMessage.pb.cc`文件。
 
 
  
5.LARV0.4-tcp_conn处理读事件方法do_read
## 14.2 并发测试结果
 
         启动服务端,再启动客户端,(注意问了方便看结果,将之前reactor的一些stdout的调试日志都关闭)
 
 看客户端运行结果
 
 ```bash
 $ ./client 1
 msg_router init...
 do_connect EINPROGRESS
 add msg cb msgid = 1
 connect 127.0.0.1:7777 succ!
 ---> qps = 6875 <----
 ---> qps = 8890 <----
 ---> qps = 8354 <----
 ---> qps = 9078 <----
 ---> qps = 8933 <----
 ---> qps = 9043 <----
 ---> qps = 8743 <----
 ---> qps = 9048 <----
 ---> qps = 8987 <----
 ---> qps = 8742 <----
 ---> qps = 8720 <----
 ---> qps = 8795 <----
 ---> qps = 8889 <----
 ---> qps = 9034 <----
 ---> qps = 8669 <----
 ---> qps = 9001 <----
 ---> qps = 8810 <----
 ---> qps = 8850 <----
 ---> qps = 8802 <----
 ---> qps = 9072 <----
 ---> qps = 8853 <----
 ---> qps = 8804 <----
 ---> qps = 8574 <----
 ---> qps = 8645 <----
 ---> qps = 8085 <----
 ---> qps = 8720 <----
 ...
 ```
 
      这里我们客户端是开启一个线程进行测试,平均每秒服务端会响应8700次左右。
 
         这里我简单用两个主机,分别测试了一些数据
 
 **主机1:**
 
 > CPU个数:2个 , 内存: 2GB ,   系统:Ubuntu18.04虚拟机
 
 | 线程数 | QPS     |
 | ------ | ------- |
 | 1      | 0.85w/s |
 | 2      | 1.96w/s |
 | 10     | 4.12w/s |
 | 100    | 4.23w/s |
 | 500    | 3.65w/s |
 
 
 
 **主机2:**
 
 > CPU个数: 24个 , 内存:128GB, 系统: 云主机
 
 | 线程数 | QPS      |
 | ------ | -------- |
 | 1      | 10.86w/s |
 | 3      | 31.06w/s |
 | 5      | 48.12w/s |
 | 8      | 59.79w/s |
 
 
 
         现在我们的Lars-Reactor模块基本已经开完完成了,后续可能会再添加一些模块,为了其他模块方便使用Lars-Reactor的一些接口,我们可以对外提供一个公共的头文件,方便一次性导入
 
 > lars_reactor/include/lars_reactor.h
 
 ```c
 #pragma once
 
 #include "io_buf.h"
 #include "buf_pool.h"
 #include "reactor_buf.h"
 
 #include "net_connection.h"
 #include "tcp_conn.h"
 #include "tcp_server.h"
 #include "tcp_client.h"
 #include "udp_server.h"
 #include "udp_client.h"
 
 #include "message.h"
 #include "task_msg.h"
 #include "event_loop.h"
 #include "thread_pool.h"
 #include "thread_queue.h"
 
 #include "config_file.h"
 ```
6.LARV0.4-tcp_conn模块回顾
## 15) 异步消息任务机制
 
         我们之前在`include/task_msg.h`中, 其中task的消息类型我们只是实现了`NEW_CONN`,目的是`thread_pool`选择一个线程,让一个线程里的`thread_queue`去创建一个连接对象。但是并没有对`NEW_TASK`的任务类型进行定义。这种类型是允许服务端去执行某项具体的业务。并不是根据客户端来消息去被动回复的业务,而是服务端主动发送的业务给到客户端。
 
 ### 15.1 任务函数类型
 
         我们先定义task的回调函数类型
 
 > lars_reactor/include/event_loop.h
 
 ```c
 //...
 
 
 //定义异步任务回调函数类型
 typedef void (*task_func)(event_loop *loop, void *args);
 
 //...
 
 ```
 
        为了防止循环头文件引用,我们把typedef定义在`event_loop.h`中。
 
 > lars_reactor/include/task_msg.h
 
 ```c
 #pragma  once
 #include "event_loop.h"
 
 //定义异步任务回调函数类型
 typedef void (*task_func)(event_loop *loop, void *args);
 
 struct task_msg
 {
     enum TASK_TYPE
     {
         NEW_CONN,   //新建链接的任务
         NEW_TASK,   //一般的任务
     };
 
     TASK_TYPE type; //任务类型
 
     //任务的一些参数
     
     union {
         //针对 NEW_CONN新建链接任务,需要传递connfd
         int connfd;
 
         //针对 NEW_TASK 新建任务, 
         //可以给一个任务提供一个回调函数
         struct {
             task_func task_cb; //注册的任务函数
             void *args;        //任务函数对应的形参
         };
     };
 };
 ```
 
         `task_func`是我们定义的一个任务的回调函数类型,第一个参数当然就是让哪个loop机制去执行这个task任务。很明显,一个loop是对应一个thread线程的。也就是让哪个thread去执行这个task任务。args是`task_func`的函数形参。
 
     
7.LARV0.4-tcp_send_message主动发包实现
### 15.2 event_loop模块添加task任务机制
 
         我们知道,task绑定一个loop,很明显,一个`event_loop`应该拥有需要被执行的task集合。
 
          在这里,我们将event_loop加上已经就绪的task任务的属性
 
 > lars_reactor/include/event_loop.h
 
 ```c
 #pragma once
 /*
  *
  * event_loop事件处理机制
  *
  * */
 #include <sys/epoll.h>
 #include <ext/hash_map>
 #include <ext/hash_set>
 #include <vector>
 #include "event_base.h"
 
 #include "task_msg.h"
 
 #define MAXEVENTS 10
 
 // map: fd->io_event 
 typedef __gnu_cxx::hash_map<int, io_event> io_event_map;
 //定义指向上面map类型的迭代器
 typedef __gnu_cxx::hash_map<int, io_event>::iterator io_event_map_it;
 //全部正在监听的fd集合
 typedef __gnu_cxx::hash_set<int> listen_fd_set;
 
 //定义异步任务回调函数类型
 typedef void (*task_func)(event_loop *loop, void *args);
 
 class event_loop 
 {
 public:
     //构造,初始化epoll堆
     event_loop();
 
     //阻塞循环处理事件
     void event_process();
 
     //添加一个io事件到loop中
     void add_io_event(int fd, io_callback *proc, int mask, void *args=NULL);
 
     //删除一个io事件从loop中
     void del_io_event(int fd);
 
     //删除一个io事件的EPOLLIN/EPOLLOUT
     void del_io_event(int fd, int mask);
 
     // ===========================================
     //获取全部监听事件的fd集合
     void get_listen_fds(listen_fd_set &fds) {
         fds = listen_fds;
     }
 
     //=== 异步任务task模块需要的方法 ===
     //添加一个任务task到ready_tasks集合中
     void add_task(task_func func, void *args);
     //执行全部的ready_tasks里面的任务
     void execute_ready_tasks();
     // ===========================================
 private:
     int _epfd; //epoll fd
 
     //当前event_loop 监控的fd和对应事件的关系
     io_event_map _io_evs;
 
     //当前event_loop 一共哪些fd在监听
     listen_fd_set listen_fds;
 
     //一次性最大处理的事件
     struct epoll_event _fired_evs[MAXEVENTS];
 
         // ===========================================
     //需要被执行的task集合
     typedef std::pair<task_func, void*> task_func_pair;
     std::vector<task_func_pair> _ready_tasks;
     // ===========================================
 };
 ```
 
 添加了两个属性:
 
 `task_func_pair`: 回调函数和参数的键值对.
 
 `_ready_tasks`: 所有已经就绪的待执行的任务集合。
8.LARV0.4-tcp_conn处理写时间方法do_write
同时添加了两个主要方法:
 
 `void add_task(task_func func, void *args)`: 添加一个任务到_ready_tasks中.
 
 `void execute_ready_tasks()`:执行全部的_ready_tasks任务。
 
 将这两个方法实现如下:
 
 > lars_reactor/src/event_loop.cpp
 
 ```c
 //...
 
 //添加一个任务task到ready_tasks集合中
 void event_loop::add_task(task_func func, void *args)
 {
     task_func_pair func_pair(func, args);
     _ready_tasks.push_back(func_pair);
 }
 
 //执行全部的ready_tasks里面的任务
 void event_loop::execute_ready_tasks()
 {
     std::vector<task_func_pair>::iterator it;
 
     for (it = _ready_tasks.begin(); it != _ready_tasks.end(); it++) {
         task_func func = it->first;//任务回调函数
         void *args = it->second;//回调函数形参
 
         //执行任务
         func(this, args);
     }
 
     //全部执行完毕,清空当前的_ready_tasks
     _ready_tasks.clear();
 }
 
 //...
 ```
 
 
 
         那么`execute_ready_tasks()`函数需要在一个恰当的时候被执行,我们这里就放在每次event_loop一次`epoll_wait()`处理完一组fd事件之后,触发一次额外的task任务。
 
 > lars_reactor/src/event_loop.cpp
 
 ```c
 //阻塞循环处理事件
 void event_loop::event_process()
 {
     while (true) {
         io_event_map_it ev_it;
 
         int nfds = epoll_wait(_epfd, _fired_evs, MAXEVENTS, 10);
         for (int i = 0; i < nfds; i++) {
                
             //...
               //...
           
         }
       
         //每次处理完一组epoll_wait触发的事件之后,处理异步任务
         this->execute_ready_tasks();
     }
 }
 ```
9.LARV0.4-tcp_server创建tcp_conn对象
    这里补充一下,因为在task的回调函数中,有形参`event_loop *loop`,可能会使用当前loop中监控的fd信息,所以我们应该给event_loop补充一个获取当前loop监控的全部fd信息的方法
 
 ```c
 class event_loop{
       //...
   
       //获取全部监听事件的fd集合
     void get_listen_fds(listen_fd_set &fds) {
         fds = listen_fds;
     }
   
       //...
 };
 ```
 
 
 
 ### 15.3 thread_pool模块添加task任务机制
 
         接下来我们就要用thread_pool来想每个thread所绑定的event_pool中去发送task任务,很明显thread_pool应该具备能够将task加入到event_pool中的_ready_task集合的功能。
 
 > lars_reactor/include/thread_pool.h
 
 ```c
 #pragma once
 
 #include <pthread.h>
 #include "task_msg.h"
 #include "thread_queue.h"
 
 class thread_pool
 {
 public:
     //构造,初始化线程池, 开辟thread_cnt个
     thread_pool(int thread_cnt);
 
     //获取一个thead
     thread_queue<task_msg>* get_thread();
 
     //发送一个task任务给thread_pool里的全部thread
     void send_task(task_func func, void *args = NULL);
 private:
 
     //_queues是当前thread_pool全部的消息任务队列头指针
     thread_queue<task_msg> ** _queues; 
 
     //当前线程池中的线程个数
     int _thread_cnt;
 
     //已经启动的全部therad编号
     pthread_t * _tids;
 
     //当前选中的线程队列下标
     int _index;
 };
 ```
 
         `send_task()`方法就是发送给线程池中全部的thread去执行task任务.
 
 > lars_reactor/src/thread_pool.cpp
 
 ```c
 void thread_pool::send_task(task_func func, void *args)
 {
     task_msg task;
 
     //给当前thread_pool中的每个thread里的pool添加一个task任务
     for (int i = 0; i < _thread_cnt; i++) {
         //封装一个task消息
         task.type = task_msg::NEW_TASK;
         task.task_cb = func;
         task.args = args;
 
         //取出第i个thread的消息队列
         thread_queue<task_msg> *queue = _queues[i];
 
         //发送task消息
         queue->send(task);
     }
 }
 ```
 
         `send_task()`的实现实际上是告知全部的thread,封装一个`NEW_TASK`类型的消息,通过`task_queue`告知对应的thread.很明显当我们进行 `queue->send(task)`的时候,当前的thread绑定的loop,就会触发`deal_task_message()`回调了。
 
 > lars_reactor/src/thread_pool.cpp
 
 ```c
 /*
  * 一旦有task消息过来,这个业务是处理task消息业务的主流程
  *
  * 只要有人调用 thread_queue:: send()方法就会触发次函数
 */
 void deal_task_message(event_loop *loop, int fd, void *args)
 {
     //得到是哪个消息队列触发的 
     thread_queue<task_msg>* queue = (thread_queue<task_msg>*)args;
 
     //将queue中的全部任务取出来
     std::queue<task_msg> tasks;
     queue->recv(tasks);
 
     while (tasks.empty() != true) {
         task_msg task = tasks.front();
 
         //弹出一个元素
         tasks.pop();
 
         if (task.type == task_msg::NEW_CONN) {
             //是一个新建链接的任务
             //并且将这个tcp_conn加入当当前线程的loop中去监听
             tcp_conn *conn = new tcp_conn(task.connfd, loop);
             if (conn == NULL) {
                 fprintf(stderr, "in thread new tcp_conn error\n");
                 exit(1);
             }
 
             printf("[thread]: get new connection succ!\n");
         }
         else if (task.type == task_msg::NEW_TASK) {
             //===========是一个新的普通任务===============
             //当前的loop就是一个thread的事件监控loop,让当前loop触发task任务的回调
             loop->add_task(task.task_cb, task.args);
             //==========================================
         } 
         else {
             //其他未识别任务
             fprintf(stderr, "unknow task!\n");
         }
 
     }
 }
 ```
10.LARV0.4-tcp_conn连接封装流程总结
    我们判断task.type如果是`NEW_TASK`就将该task加入到当前loop中去.
 
 通过上面的设计,可以看出来,thread_pool的`send_task()`应该是一个对外的开发者接口,所以我们要让服务器的`tcp_server`能够获取到`thread_pool`属性.
 
 > lars_reactor/include/tcp_server.h
 
 ```c
 class tcp_server {
       //...
   
     //获取当前server的线程池
     thread_pool *thread_poll() {
         return _thread_pool;
     }
   
       //...
 };
 ```
 
         ok,这样我们基本上完成的task异步处理业务的机制. 下面我们来测试一下这个功能.
 
 
 
 ### 15.4 完成Lars Reactor V0.11开发
 
 > server.cpp
 
 ```c
 #include "tcp_server.h"
 #include <string>
 #include <string.h>
 #include "config_file.h"
 
 tcp_server *server;
 
 void print_lars_task(event_loop *loop, void *args)
 {
     printf("======= Active Task Func! ========\n");
     listen_fd_set fds;
     loop->get_listen_fds(fds);//不同线程的loop,返回的fds是不同的
 
     //可以向所有fds触发
     listen_fd_set::iterator it;
     //遍历fds
     for (it = fds.begin(); it != fds.end(); it++) {
         int fd = *it;
         tcp_conn *conn = tcp_server::conns[fd]; //取出fd
         if (conn != NULL) {
             int msgid = 101;
             const char *msg = "Hello I am a Task!";
             conn->send_message(msg, strlen(msg), msgid);
         }
     }
 }
 
 //回显业务的回调函数
 void callback_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
 {
     printf("callback_busi ...\n");
     //直接回显
     conn->send_message(data, len, msgid);
 }
 
 //打印信息回调函数
 void print_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
 {
     printf("recv client: [%s]\n", data);
     printf("msgid: [%d]\n", msgid);
     printf("len: [%d]\n", len);
 }
11.LARV0.4-tcp_client属性和方法分析
//新客户端创建的回调
 void on_client_build(net_connection *conn, void *args)
 {
     int msgid = 101;
     const char *msg = "welcome! you online..";
 
     conn->send_message(msg, strlen(msg), msgid);
 
     //创建链接成功之后触发任务
     server->thread_poll()->send_task(print_lars_task);
 }
 
 //客户端销毁的回调
 void on_client_lost(net_connection *conn, void *args)
 {
     printf("connection is lost !\n");
 }
 
 
 int main() 
 {
     event_loop loop;
 
     //加载配置文件
     config_file::setPath("./serv.conf");
     std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0");
     short port = config_file::instance()->GetNumber("reactor", "port", 8888);
 
     printf("ip = %s, port = %d\n", ip.c_str(), port);
 
     server = new tcp_server(&loop, ip.c_str(), port);
 
     //注册消息业务路由
     server->add_msg_router(1, callback_busi);
     server->add_msg_router(2, print_busi);
 
     //注册链接hook回调
     server->set_conn_start(on_client_build);
     server->set_conn_close(on_client_lost);
 
 
     loop.event_process();
 
     return 0;
 }
 ```
 
         我们在每次建立连接成功之后,触发任务机制。其中`print_lars_task()`方法就是我们的异步任务。由于是全部thead都出发,所以该方法会被每个thread执行。但是不同的thread中的pool所返回的fd是不一样的,这里在`print_lars_task()`中,我们给对应的客户端做了一个简单的消息发送。
 
         
 
 > client.cpp
 
 ```c
 #include "tcp_client.h"
 #include <stdio.h>
 #include <string.h>
 
 
 //客户端业务
 void busi(const char *data, uint32_t len, int msgid, net_connection  *conn, void *user_data)
 {
     //得到服务端回执的数据 
     char *str = NULL;
     
     str = (char*)malloc(len+1);
     memset(str, 0, len+1);
     memcpy(str, data, len);
     printf("recv server: [%s]\n", str);
     printf("msgid: [%d]\n", msgid);
     printf("len: [%d]\n", len);
 }
 
 //客户端销毁的回调
 void on_client_build(net_connection *conn, void *args)
 {
     int msgid = 1; 
     const char *msg = "Hello Lars!";
 
     conn->send_message(msg, strlen(msg), msgid);
 }
 
 //客户端销毁的回调
 void on_client_lost(net_connection *conn, void *args) 
 {
     printf("on_client_lost...\n");
     printf("Client is lost!\n");
 }
 
 int main() 
 {
     event_loop loop;
     //创建tcp客户端
     tcp_client client(&loop, "127.0.0.1", 7777, "clientv0.6");
 
     //注册消息路由业务
     client.add_msg_router(1, busi);
     client.add_msg_router(101, busi);
 
     //设置hook函数
     client.set_conn_start(on_client_build);
     client.set_conn_close(on_client_lost);
 
     //开启事件监听
     loop.event_process();
 
     return 0;
 }
 
 ```
12.LARV0.4-tcp_client非阻塞客户端套接字创建
    客户端代码无差别。
 
 编译并运行
 服务端:
 
 ```bash
 $ ./server 
 msg_router init...
 ip = 127.0.0.1, port = 7777
 create 0 thread
 create 1 thread
 create 2 thread
 create 3 thread
 create 4 thread
 add msg cb msgid = 1
 add msg cb msgid = 2
 begin accept
 begin accept
 [thread]: get new connection succ!
 callback_busi ...
 ======= Active Task Func! ========
 ======= Active Task Func! ========
 ======= Active Task Func! ========
 ======= Active Task Func! ========
 ======= Active Task Func! ========
 ```
 
 客户端:
 
 ```c
 $ ./client 
 msg_router init...
 do_connect EINPROGRESS
 add msg cb msgid = 1
 add msg cb msgid = 101
 connect 127.0.0.1:7777 succ!
 recv server: [welcome! you online..]
 msgid: [101]
 len: [21]
 recv server: [Hello Lars!]
 msgid: [1]
 len: [11]
 recv server: [Hello I am a Task!]
 msgid: [101]
 len: [18]
 ```
 
         task机制已经集成完毕,lars_reactor功能更加强大了。
 
 
 
 
 
 ## 16) 链接属性设置功能
 
 ### 16.1 测试链接属性
 
         我们在基于lars_reactor开发的时候,可能有时候需要在写消息回调的时候,希望conn绑定一些属性。现在我们可以配置这种功能到`net_connection`上。
 
 > lars_reactor/include/net_connection.h
 
 ```c
 #pragma once
 
 #include <stdio.h>
 
 /*
  * 
  * 网络通信的抽象类,任何需要进行收发消息的模块,都可以实现该类
  *
  * */
 
 class net_connection
 {
 public:
     net_connection():param(NULL) {}
     //发送消息的接口
     virtual int send_message(const char *data, int datalen, int msgid) = 0;
     virtual int get_fd() = 0;
 
     void *param; //TCP客户端可以 通过该参数传递一些自定义的参数
 };
 
 //创建链接/销毁链接 要触发的 回调函数类型
 typedef void (*conn_callback)(net_connection *conn, void *args);
 ```
 
         这里我们给`net_connection`类添加了一个`param`属性,这样,我们就可以绑定一些开发者自定义的参数和当前链接进行绑定。注意,这里也提供了一个`get_fd()`的纯虚函数,目的是提供conn获取当前fd的数据。
