C++负载均衡远程调用学习之消息队列与线程池
目录
1.昨日回顾
2.单线程的多路IO服务器模型和多线程模型区别
3.服务器的集中并发模式
4.LARSV0.8-task_msg消息队列任务数据类型
5.LARSV0.8--thread_queue消息队列的发送和接收流
6.LARSV0.8-thread_pool线程池的实现
7.LARSV0.8-thread_pool线程池的实现
8.LARSV0.8-处理新链接任务的函数业务实现
9.LARSV0.8-线程池继承到tcp_server中
10.LARSV0.8-启用线程池测试
11.LARSV0.8-限制链接数量的溢出bug问题
1.昨日回顾
2.单线程的多路IO服务器模型和多线程模型区别
## 1 server(TCP服务)
### API-1 创建一个服务器
```c++
tcp_server::tcp_server(event_loop *loop, const char *ip, uint16_t port)
```
`loop`: 事件监听机制堆
`ip`: 服务器监听地址
`port`: 服务器监听端口
**example**
```c++
#include "tcp_server.h"
int main()
{
event_loop loop;
tcp_server server(&loop, "127.0.0.1", 7777);
loop.event_process();
return 0;
}
```
3.服务器的集中并发模式
### API-2 添加路由业务
**业务回调函数方法原型**
```c++
typedef void msg_callback(const char *data, uint32_t len, int msgid, net_connection* conn, void *user_data);
```
`data`:需要处理的业务数据
`len`:需要处理的业务数据长度
`msgid`:消息ID
`conn`:需要处理数据的当前链接
`user_data`:开发者额外的自定义参数
4.LARSV0.8-task_msg消息队列任务数据类型
**注册路由业务**
tcp_server对象的方法。
```c++
void tcp_server::add_msg_router(int msgid, msg_callback *cb, void *user_data = NULL);
```
`msgid`:消息ID
`cb`:开发自定义的路由回调业务
`user_data`:开发者额外的自定义参数
5.LARSV0.8--thread_queue消息队列的发送和接收流
**example**
```c++
#include "tcp_server.h"
//定义一个回显的业务
void callback_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
{
printf("callback busi is called!!!!!\n");
//直接将数据发回去
conn->send_message(data, len, msgid);
}
//定义一个打印业务
void print_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
{
printf("print busi is called!\n");
printf("recv from client : [%s]\n", data);
printf("msgid = %d\n", msgid);
printf("len = %d\n", len);
}
int main()
{
event_loop loop;
tcp_server server(&loop, "127.0.0.1", 7777);
//注册一些回调方法
server.add_msg_router(1, callback_busi);
server.add_msg_router(2, print_busi);
loop.event_process();
return 0;
}
```
6.LARSV0.8-thread_pool线程池的实现
### API-3 注册链接创建/销毁成功业务
**回调业务Hook函数原型**
```c++
typedef void (*conn_callback)(net_connection *conn, void *args);
```
`conn`:需要处理数据的当前链接
`args`:开发者额外的自定义参数
**新建连接成功之后Hook注册**
```c++
static void set_conn_start(conn_callback cb, void *args = NULL);
```
`cb`:回调业务Hook函数
`args`:开发者额外的自定义参数
7.LARSV0.8-thread_pool线程池的实现
**销毁连接之前Hookl注册**
```c++
static void set_conn_close(conn_callback cb, void *args = NULL);
```
`cb`:回调业务Hook函数
`args`:开发者额外的自定义参数
**example**
```c++
#include "tcp_server.h"
#include "string.h"
//打印业务
void print_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
{
printf("recv from client : [%s]\n", data);
printf("msgid = %d\n", msgid);
printf("len = %d\n", len);
}
//新客户端创建成功之后的回调
void on_client_build(net_connection *conn, void *args)
{
printf("===> on_client_build is called!\n");
}
//客户端断开之前的回调
void on_client_lost(net_connection *conn, void *args)
{
printf("===> on_client_lost is called!\n");
}
int main()
{
event_loop loop;
tcp_server server(&loop, "127.0.0.1", 7777);
//注册一些回调方法
server.add_msg_router(2, print_busi);
//注册链接hook函数
server.set_conn_start(on_client_build);
server.set_conn_close(on_client_lost);
loop.event_process();
return 0;
}
```
8.LARSV0.8-处理新链接任务的函数业务实现
## 2 connection链接
### API-1 发送数据
```c++
int send_message(const char *data, int msglen, int msgid)
```
`data`:需要处理的业务数据
`msglen`:需要处理的业务数据长度
`msgid`:消息ID
**example**
```c++
#include "tcp_server.h"
//定义一个回显的业务
void callback_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
{
//直接将数据发回去
conn->send_message(data, len, msgid);
}
int main()
{
event_loop loop;
tcp_server server(&loop, "127.0.0.1", 7777);
//注册一些回调方法
server.add_msg_router(1, callback_busi);
loop.event_process();
return 0;
}
```
9.LARSV0.8-线程池继承到tcp_server中
## 3 客户端(C/C++)
### API-1创建一个客户端
```c++
tcp_client::tcp_client(event_loop *loop, const char *ip, unsigned short port);
```
`loop`: 事件监听机制堆
`ip`: 服务器监听地址
`port`: 服务器监听端口
**example**
```c++
#include "tcp_client.h"
//注册一个客户端处理服务器返回数据的回调业务
void busi(const char *data, uint32_t len, int msgid, net_connection *client, void *user_data)
{
printf("recv from server\n");
printf("msgid = %d\n", msgid);
printf("len = %d\n", len);
printf("data = %s\n", data);
conn->send_message(data, len, msgid);
}
int main()
{
event_loop loop;
tcp_client *client = new tcp_client(&loop, "127.0.0.1", 7777);
//注册一个回调业务
client->add_msg_router(1, busi);
loop.event_process();
}
```
### API-2 清除一个客户端
```c++
tcp_client::clean_conn();
```
10.LARSV0.8-启用线程池测试
### API-3 注册链接创建/销毁成功业务
**回调业务Hook函数原型**
```c++
typedef void (*conn_callback)(net_connection *conn, void *args);
```
`conn`:需要处理数据的当前链接
`args`:开发者额外的自定义参数
**新建连接成功之后Hook注册**
```c++
static void set_conn_start(conn_callback cb, void *args = NULL);
```
`cb`:回调业务Hook函数
`args`:开发者额外的自定义参数
**销毁连接之前Hookl注册**
```c++
static void set_conn_close(conn_callback cb, void *args = NULL);
```
`cb`:回调业务Hook函数
`args`:开发者额外的自定义参数
**example**
```c++
#include "tcp_client.h"
#include <string.h>
//注册一个客户端处理服务器返回数据的回调业务
void callback_busi(const char *data, uint32_t len, int msgid, net_connection* conn, void *user_data)
{
//将数据写回去
conn->send_message(data, len, msgid);
}
//打印业务
void print_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
{
printf("print busi is called!\n");
printf("recv from server : [%s]\n", data);
printf("msgid = %d\n", msgid);
printf("len = %d\n", len);
}
//客户端创建连接之后hook业务
void on_client_build(net_connection *conn, void *args)
{
printf("==> on_client build !\n");
//客户端一旦连接成功 就会主动server发送一个msgid = 1 的消息
int msgid = 1;
const char *msg = "hello Lars";
conn->send_message(msg, strlen(msg), msgid);
}
11.LARSV0.8-限制链接数量的溢出bug问题
//客户端销毁连接之前的hook业务
void on_client_lost(net_connection *conn, void *args)
{
printf("==> on_client lost !\n");
}
int main()
{
event_loop loop;
tcp_client *client = new tcp_client(&loop, "127.0.0.1", 7777);
client->add_msg_router(1, print_busi);
client->add_msg_router(2, callback_busi);
client->add_msg_router(200, print_busi);
//注册hook函数
client->set_conn_start(on_client_build);
client->set_conn_close(on_client_lost);
loop.event_process();
}
```