C++负载均衡远程调用学习之消息路分发机制
目录
1.LARV0.5-TCP_server链接管理的功能实现及测试
2.LARV0.6
3.LARV0.6
4.LARV0.6
5.LARV0.6-tcp_server集成
6.LARV0.6-tcp_server集成消息路由分发机制总结
7.LARV0.6回顾
1.LARV0.5-TCP_server链接管理的功能实现及测试
### 16.2 完成Lars Reactor V0.12开发
### server端
```c
#include "tcp_server.h"
#include <string>
#include <string.h>
#include "config_file.h"
tcp_server *server;
//回显业务的回调函数
void callback_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
{
printf("callback_busi ...\n");
//直接回显
conn->send_message(data, len, msgid);
printf("conn param = %s\n", (const char *)conn->param);
}
//打印信息回调函数
void print_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
{
printf("recv client: [%s]\n", data);
printf("msgid: [%d]\n", msgid);
printf("len: [%d]\n", len);
}
//新客户端创建的回调
void on_client_build(net_connection *conn, void *args)
{
int msgid = 101;
const char *msg = "welcome! you online..";
conn->send_message(msg, strlen(msg), msgid);
//将当前的net_connection 绑定一个自定义参数,供我们开发者使用
const char *conn_param_test = "I am the conn for you!";
conn->param = (void*)conn_param_test;
}
//客户端销毁的回调
void on_client_lost(net_connection *conn, void *args)
{
printf("connection is lost !\n");
}
int main()
{
event_loop loop;
//加载配置文件
config_file::setPath("./serv.conf");
std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0");
short port = config_file::instance()->GetNumber("reactor", "port", 8888);
printf("ip = %s, port = %d\n", ip.c_str(), port);
server = new tcp_server(&loop, ip.c_str(), port);
//注册消息业务路由
server->add_msg_router(1, callback_busi);
server->add_msg_router(2, print_busi);
//注册链接hook回调
server->set_conn_start(on_client_build);
server->set_conn_close(on_client_lost);
loop.event_process();
return 0;
}
```
2.LARV0.6消息路由分发机制msg_router的定义
### client端
```c
#include "tcp_client.h"
#include <stdio.h>
#include <string.h>
//客户端业务
void busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
{
//得到服务端回执的数据
char *str = NULL;
str = (char*)malloc(len+1);
memset(str, 0, len+1);
memcpy(str, data, len);
printf("recv server: [%s]\n", str);
printf("msgid: [%d]\n", msgid);
printf("len: [%d]\n", len);
}
//客户端销毁的回调
void on_client_build(net_connection *conn, void *args)
{
int msgid = 1;
const char *msg = "Hello Lars!";
conn->send_message(msg, strlen(msg), msgid);
}
//客户端销毁的回调
void on_client_lost(net_connection *conn, void *args)
{
printf("on_client_lost...\n");
printf("Client is lost!\n");
}
int main()
{
event_loop loop;
//创建tcp客户端
tcp_client client(&loop, "127.0.0.1", 7777, "clientv0.6");
//注册消息路由业务
client.add_msg_router(1, busi);
client.add_msg_router(101, busi);
//设置hook函数
client.set_conn_start(on_client_build);
client.set_conn_close(on_client_lost);
//开启事件监听
loop.event_process();
return 0;
}
```
和之前的client无任何改变。
### 运行结果
3.LARV0.6抽象链接类Net_connection定义
* 服务端:
```c
$ ./server
msg_router init...
ip = 127.0.0.1, port = 7777
create 0 thread
create 1 thread
create 2 thread
create 3 thread
create 4 thread
add msg cb msgid = 1
add msg cb msgid = 2
begin accept
begin accept
[thread]: get new connection succ!
callback_busi ...
conn param = I am the conn for you!
```
会发现我们是可以在callback中拿到conn的属性
# 四、Lars-DNS Service开发
## **1) 简介**
负责接收各agent对某modid、cmdid的请求并返回该modid、cmdid下的所有节点,即为agent提供获取路由服务
### 1.1 架构

### **1.2 网络模块**
DnsService服务模型采用了one loop per thread TCP服务器,主要是基于Lars-Reactor:
- 主线程Accepter负责接收连接(agent端连接)
- Thread loop们负责处理连接的请求、回复;(agent端发送查询请求,期望获取结果)
### **1.3 双map模型**
DnsServer使用两个map存储路由数据(key = `modid<<32 + cmdid` , value = set of `ip<<32 + port`)
- 一个`RouterDataMap_A`:主数据,查询请求在此map执行
- 另一个`RouterDataMap_B`:后台线程周期性重加载路由到此map,作为最新数据替换掉上一个map
这两个map分别由指针`data_pointer`与`temp_pointer`指向.
### 1.4 Backend Thread守护线程
**dns service还有个业务线程:**
1、负责周期性(default:1s)检查`RouteVersion`表版本号,如有变化,说明`RouteData`有变更,则重加载`RouteData`表内容;然后将`RouteChange`表中被变更的`modid`取出,根据订阅列表查出`modid`被哪些连接订阅后,向所有工作线程发送任务:要求订阅这些`modid`的连接推送`modid`路由到agent
2、此外,还负责周期性(default:8s)重加载`RouteData`表内容
4.LARV0.6-tcp_server集成路由消息分发机制
### **主业务**
1. 服务启动时,`RouteData`表被加载到`data_pointer`指向的`RouterDataMap_A`中, `temp_pointer`指向的`RouterDataMap_B`为空
2. 服务启动后,agent发来Query for 请求某`modid/cmdid`,到其所在Thread Loop上,上读锁查询`data_pointer`指向的`RouterDataMap_A`,返回查询结果;
3. 如果此`modid/cmdid`不存在,则把`agent ip+port`+`moid/cmdid`发送到Backend thread loop1的队列,让其记录到ClientMap
后台线程Backend thread每隔10s清空`temp_pointer`指向的`RouterDataMap_B`,再加载`RouteData`表内容到`temp_pointer`指向的`RouterDataMap_B`,加载成功后交换指针`data_pointer`与`temp_pointer`指针内容,于是完成了路由数据的更新.
## **2) 数据库创建**
* 表`RouteData`: 保存了所有mod路由信息.
| 字段 | 数据类型 | 是否可以为空 | 主键 | 默认 | 附加 | 说明 |
| ---------- | ---------------- | ------------ | ---- | ---- | ------ | ------------ |
| id | int(10) unsigned | No | 是 | NULL | 自增长 | 该条数据ID |
| modid | int(10) unsigned | No | | NULL | | 模块ID |
| cmdid | int(10) unsigned | No | | NULL | | 指令ID |
| serverip | int(10) unsigned | No | | NULL | | 服务器IP地址 |
| serverport | int(10) unsigned | No | | NULL | | 服务器端口 |
* 表`RouteVersion`: 当前`RouteData`路由版本号,每次管理端修改某mod的路由,`RouteVersion`表中的版本号都被更新为当前时间戳
| 字段 | 数据类型 | 是否可以为空 | 主键 | 默认 | 附加 |
| ------- | ---------------- | ------------ | ---- | ---- | ------ |
| id | int(10) unsigned | No | 是 | NULL | 自增长 |
| version | int(10) unsigned | No | | NULL | |
5.LARV0.6-tcp_server集成
* 表`RouteChange`: 每次管理端修改某mod的路由,会记录本次对哪个mod进行修改(增、删、改),以便指示最新的`RouteData`路由有哪些mod变更了。
| 字段 | 数据类型 | 是否可以为空 | 主键 | 默认 | 附加 |
| ------- | ------------------- | ------------ | ---- | ---- | ------ |
| id | int(10) unsigned | No | 是 | NULL | 自增长 |
| modid | int(10) unsigned | No | | NULL | |
| cmdid | int(10) unsigned | No | | NULL | |
| version | bigint(20) unsigned | No | | NULL | |
相关创建表格的sql语句如下`lars_dns.sql`
```sql
DROP DATABASE if exists lars_dns;
CREATE DATABASE lars_dns;
USE lars_dns;
DROP TABLE IF EXISTS `RouteData`;
CREATE TABLE `RouteData` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`modid` int(10) unsigned NOT NULL,
`cmdid` int(10) unsigned NOT NULL,
`serverip` int(10) unsigned NOT NULL,
`serverport` int(10) unsigned NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=116064 DEFAULT CHARSET=utf8;
DROP TABLE IF EXISTS `RouteVersion`;
CREATE TABLE RouteVersion (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`version` int(10) unsigned NOT NULL,
PRIMARY KEY (`id`)
);
INSERT INTO RouteVersion(version) VALUES(0);
DROP TABLE IF EXISTS `RouteChange`;
CREATE TABLE RouteChange (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`modid` int(10) unsigned NOT NULL,
`cmdid` int(10) unsigned NOT NULL,
`version` bigint(20) unsigned NOT NULL,
PRIMARY KEY (`id`)
);
```
6.LARV0.6-tcp_server集成消息路由分发机制总结
7.LARV0.6回顾