当前位置: 首页 > news >正文

C++负载均衡远程调用学习之消息路分发机制

目录

 

1.LARV0.5-TCP_server链接管理的功能实现及测试

2.LARV0.6

3.LARV0.6

4.LARV0.6

5.LARV0.6-tcp_server集成

6.LARV0.6-tcp_server集成消息路由分发机制总结

7.LARV0.6回顾


 

1.LARV0.5-TCP_server链接管理的功能实现及测试

### 16.2 完成Lars Reactor V0.12开发

### server端

```c
#include "tcp_server.h"
#include <string>
#include <string.h>
#include "config_file.h"

tcp_server *server;

//回显业务的回调函数
void callback_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
{
    printf("callback_busi ...\n");
    //直接回显
    conn->send_message(data, len, msgid);

    printf("conn param = %s\n", (const char *)conn->param);
}

//打印信息回调函数
void print_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
{
    printf("recv client: [%s]\n", data);
    printf("msgid: [%d]\n", msgid);
    printf("len: [%d]\n", len);
}


//新客户端创建的回调
void on_client_build(net_connection *conn, void *args)
{
    int msgid = 101;
    const char *msg = "welcome! you online..";

    conn->send_message(msg, strlen(msg), msgid);

    //将当前的net_connection 绑定一个自定义参数,供我们开发者使用
    const char *conn_param_test = "I am the conn for you!";
    conn->param = (void*)conn_param_test;
}

//客户端销毁的回调
void on_client_lost(net_connection *conn, void *args)
{
    printf("connection is lost !\n");
}


int main() 
{
    event_loop loop;

    //加载配置文件
    config_file::setPath("./serv.conf");
    std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0");
    short port = config_file::instance()->GetNumber("reactor", "port", 8888);

    printf("ip = %s, port = %d\n", ip.c_str(), port);

    server = new tcp_server(&loop, ip.c_str(), port);

    //注册消息业务路由
    server->add_msg_router(1, callback_busi);
    server->add_msg_router(2, print_busi);

    //注册链接hook回调
    server->set_conn_start(on_client_build);
    server->set_conn_close(on_client_lost);


    loop.event_process();

    return 0;
}
```

2.LARV0.6消息路由分发机制msg_router的定义

### client端

```c
#include "tcp_client.h"
#include <stdio.h>
#include <string.h>


//客户端业务
void busi(const char *data, uint32_t len, int msgid, net_connection  *conn, void *user_data)
{
    //得到服务端回执的数据 
    char *str = NULL;
    
    str = (char*)malloc(len+1);
    memset(str, 0, len+1);
    memcpy(str, data, len);
    printf("recv server: [%s]\n", str);
    printf("msgid: [%d]\n", msgid);
    printf("len: [%d]\n", len);
}


//客户端销毁的回调
void on_client_build(net_connection *conn, void *args)
{
    int msgid = 1; 
    const char *msg = "Hello Lars!";

    conn->send_message(msg, strlen(msg), msgid);
}

//客户端销毁的回调
void on_client_lost(net_connection *conn, void *args) 
{
    printf("on_client_lost...\n");
    printf("Client is lost!\n");
}



int main() 
{

    event_loop loop;

    //创建tcp客户端
    tcp_client client(&loop, "127.0.0.1", 7777, "clientv0.6");


    //注册消息路由业务
    client.add_msg_router(1, busi);
    client.add_msg_router(101, busi);


    //设置hook函数
    client.set_conn_start(on_client_build);
    client.set_conn_close(on_client_lost);


    //开启事件监听
    loop.event_process();

    return 0;
}
```

​        和之前的client无任何改变。



### 运行结果

3.LARV0.6抽象链接类Net_connection定义

* 服务端:

```c
$ ./server 
msg_router init...
ip = 127.0.0.1, port = 7777
create 0 thread
create 1 thread
create 2 thread
create 3 thread
create 4 thread
add msg cb msgid = 1
add msg cb msgid = 2
begin accept
begin accept
[thread]: get new connection succ!
callback_busi ...
conn param = I am the conn for you!
```

​        会发现我们是可以在callback中拿到conn的属性



# 四、Lars-DNS Service开发

## **1) 简介**

​        负责接收各agent对某modid、cmdid的请求并返回该modid、cmdid下的所有节点,即为agent提供获取路由服务

### 1.1 架构

![3-Lars-dnsserver](./pictures/3-Lars-dnsserver.png)

### **1.2 网络模块**

​    DnsService服务模型采用了one loop per thread TCP服务器,主要是基于Lars-Reactor:

- 主线程Accepter负责接收连接(agent端连接)
- Thread loop们负责处理连接的请求、回复;(agent端发送查询请求,期望获取结果)



### **1.3 双map模型** 

​    DnsServer使用两个map存储路由数据(key = `modid<<32 + cmdid` , value = set of `ip<<32 + port`)

- 一个`RouterDataMap_A`:主数据,查询请求在此map执行
- 另一个`RouterDataMap_B`:后台线程周期性重加载路由到此map,作为最新数据替换掉上一个map

这两个map分别由指针`data_pointer`与`temp_pointer`指向.



### 1.4 Backend Thread守护线程

**dns service还有个业务线程:** 

1、负责周期性(default:1s)检查`RouteVersion`表版本号,如有变化,说明`RouteData`有变更,则重加载`RouteData`表内容;然后将`RouteChange`表中被变更的`modid`取出,根据订阅列表查出`modid`被哪些连接订阅后,向所有工作线程发送任务:要求订阅这些`modid`的连接推送`modid`路由到agent

2、此外,还负责周期性(default:8s)重加载`RouteData`表内容

4.LARV0.6-tcp_server集成路由消息分发机制

### **主业务**

1. 服务启动时,`RouteData`表被加载到`data_pointer`指向的`RouterDataMap_A`中, `temp_pointer`指向的`RouterDataMap_B`为空

          2. 服务启动后,agent发来Query for 请求某`modid/cmdid`,到其所在Thread Loop上,上读锁查询`data_pointer`指向的`RouterDataMap_A`,返回查询结果;
          3. 如果此`modid/cmdid`不存在,则把`agent ip+port`+`moid/cmdid`发送到Backend thread loop1的队列,让其记录到ClientMap

后台线程Backend thread每隔10s清空`temp_pointer`指向的`RouterDataMap_B`,再加载`RouteData`表内容到`temp_pointer`指向的`RouterDataMap_B`,加载成功后交换指针`data_pointer`与`temp_pointer`指针内容,于是完成了路由数据的更新.

## **2) 数据库创建**

* 表`RouteData`: 保存了所有mod路由信息.

| 字段       | 数据类型         | 是否可以为空 | 主键 | 默认 | 附加   | 说明         |
| ---------- | ---------------- | ------------ | ---- | ---- | ------ | ------------ |
| id         | int(10) unsigned | No           | 是   | NULL | 自增长 | 该条数据ID   |
| modid      | int(10) unsigned | No           |      | NULL |        | 模块ID       |
| cmdid      | int(10) unsigned | No           |      | NULL |        | 指令ID       |
| serverip   | int(10) unsigned | No           |      | NULL |        | 服务器IP地址 |
| serverport | int(10) unsigned | No           |      | NULL |        | 服务器端口   |



* 表`RouteVersion`: 当前`RouteData`路由版本号,每次管理端修改某mod的路由,`RouteVersion`表中的版本号都被更新为当前时间戳

| 字段    | 数据类型         | 是否可以为空 | 主键 | 默认 | 附加   |
| ------- | ---------------- | ------------ | ---- | ---- | ------ |
| id      | int(10) unsigned | No           | 是   | NULL | 自增长 |
| version | int(10) unsigned | No           |      | NULL |        |

5.LARV0.6-tcp_server集成

* 表`RouteChange`: 每次管理端修改某mod的路由,会记录本次对哪个mod进行修改(增、删、改),以便指示最新的`RouteData`路由有哪些mod变更了。

| 字段    | 数据类型            | 是否可以为空 | 主键 | 默认 | 附加   |
| ------- | ------------------- | ------------ | ---- | ---- | ------ |
| id      | int(10) unsigned    | No           | 是   | NULL | 自增长 |
| modid   | int(10) unsigned    | No           |      | NULL |        |
| cmdid   | int(10) unsigned    | No           |      | NULL |        |
| version | bigint(20) unsigned | No           |      | NULL |        |



相关创建表格的sql语句如下`lars_dns.sql`

```sql
DROP DATABASE if exists lars_dns;
CREATE DATABASE lars_dns;
USE lars_dns;

DROP TABLE IF EXISTS `RouteData`;
CREATE TABLE `RouteData` (
    `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
    `modid` int(10) unsigned NOT NULL,
    `cmdid` int(10) unsigned NOT NULL,
    `serverip` int(10) unsigned NOT NULL,
    `serverport` int(10) unsigned NOT NULL,
    PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=116064 DEFAULT CHARSET=utf8;


DROP TABLE IF EXISTS `RouteVersion`;
CREATE TABLE RouteVersion (
    `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
    `version` int(10) unsigned NOT NULL,
    PRIMARY KEY (`id`)
);
INSERT INTO RouteVersion(version) VALUES(0);

DROP TABLE IF EXISTS `RouteChange`;
CREATE TABLE RouteChange (
    `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
    `modid` int(10) unsigned NOT NULL,
    `cmdid` int(10) unsigned NOT NULL,
    `version` bigint(20) unsigned NOT NULL,
    PRIMARY KEY (`id`)
);
```

6.LARV0.6-tcp_server集成消息路由分发机制总结

 

7.LARV0.6回顾

 

 

 

 

相关文章:

  • 近端策略优化PPO详解:python从零实现
  • 哈希表笔记(二)redis
  • 定义一个3D cube,并计算cube每个顶点的像素坐标
  • 怎么查看数据库容量
  • PDF本地化开源项目推荐
  • Android面试总结之GC算法篇
  • 使用scipy求解优化问题
  • 【经管数据】A股上市公司资产定价效率数据(2000-2023年)
  • 数据结构学习之顺序表
  • 【Quest开发】极简版!透视环境下抠出身体并能遮挡身体上的服装
  • Python实例题:Python实现Python解释器
  • Three.js + React 实战系列-3D 个人主页:构建 About 组件 (响应式 + 互动 + 动效)✨
  • 华为云服务器VoceChat在线聊天室部署
  • 注册登录页面项目
  • Ocelot\Consul\.NetCore的微服务应用案例
  • 构建现代分布式云架构的三大支柱:服务化、Service Mesh 与 Serverless
  • 微软推出数款Phi 4“开放式”人工智能模型
  • 系统架构设计师:设计模式——创建型设计模式
  • 微软与Meta大幅增加人工智能基础设施投入
  • 普通 html 项目也可以支持 scss_sass
  • 长三角议事厅| AI作曲时代:长三角如何奏响数字音乐乐章
  • 保险经纪公司元保在纳斯达克挂牌上市,去年净赚4.36亿元
  • 新华时评:需要“重新平衡”的是美国心态
  • 苹果手机为何无法在美制造?全球供应链难迁移
  • 光明日报社论:用你我的匠心,托举起繁盛的中国
  • 违规行为屡禁不止、责任边界模糊不清,法治日报:洞穴探险,谁为安全事故买单?