当前位置: 首页 > news >正文

C++负载均衡远程调用学习之异步消息任务功能与连接属性

目录

1.LarV0.11-异步消息机制的event_loop增添属性分析

2.LARS

3.LarV0.11异步消息发送机制的实现及测试

4.LarV0.11异步消息任务机制bug修复和效果演示

5.LarV0.12链接参数属性的绑定


1.LarV0.11-异步消息机制的event_loop增添属性分析

## 4) 事件触发event_loop

​        接下来我们要尝试添加多路IO的处理机制,当然linux的平台下, 最优的选择就是使用epoll来做,但是用原生的epoll实际上编程起来扩展性不是很强,那么我们就需要封装一套IO事件处理机制。

2.LARS

### 4.1 io_event基于IO事件封装

​        我们首先定义一个IO事件类来包括一个时间需要拥有的基本成员信息.

> lars_reactor/include/event_base.h

```cpp
#pragma once
/*
 * 定义一些IO复用机制或者其他异常触发机制的事件封装
 *
 * */

class event_loop;

//IO事件触发的回调函数
typedef void io_callback(event_loop *loop, int fd, void *args);

/*
 * 封装一次IO触发实现 
 * */
struct io_event 
{
             io_event():read_callback(NULL),write_callback(NULL),rcb_args(NULL),wcb_args(NULL) {}

    int mask; //EPOLLIN EPOLLOUT
    io_callback *read_callback; //EPOLLIN事件 触发的回调 
    io_callback *write_callback;//EPOLLOUT事件 触发的回调
    void *rcb_args; //read_callback的回调函数参数
    void *wcb_args; //write_callback的回调函数参数
};
```

​        一个`io_event`对象应该包含 一个epoll的事件标识`EPOLLIN/EPOLLOUT`,和对应事件的处理函数`read_callback`,`write_callback`。他们都应该是`io_callback`类型。然后对应的函数形参。

3.LarV0.11异步消息发送机制的实现及测试

## 15) 异步消息任务机制

​        我们之前在`include/task_msg.h`中, 其中task的消息类型我们只是实现了`NEW_CONN`,目的是`thread_pool`选择一个线程,让一个线程里的`thread_queue`去创建一个连接对象。但是并没有对`NEW_TASK`的任务类型进行定义。这种类型是允许服务端去执行某项具体的业务。并不是根据客户端来消息去被动回复的业务,而是服务端主动发送的业务给到客户端。

4.LarV0.11异步消息任务机制bug修复和效果演示

### 15.1 任务函数类型

​        我们先定义task的回调函数类型

> lars_reactor/include/event_loop.h

```c
//...


//定义异步任务回调函数类型
typedef void (*task_func)(event_loop *loop, void *args);

//...

```

​       为了防止循环头文件引用,我们把typedef定义在`event_loop.h`中。

> lars_reactor/include/task_msg.h

```c
#pragma  once
#include "event_loop.h"

//定义异步任务回调函数类型
typedef void (*task_func)(event_loop *loop, void *args);

struct task_msg
{
    enum TASK_TYPE
    {
        NEW_CONN,   //新建链接的任务
        NEW_TASK,   //一般的任务
    };

    TASK_TYPE type; //任务类型

    //任务的一些参数
    
    union {
        //针对 NEW_CONN新建链接任务,需要传递connfd
        int connfd;

        //针对 NEW_TASK 新建任务, 
        //可以给一个任务提供一个回调函数
        struct {
            task_func task_cb; //注册的任务函数
            void *args;        //任务函数对应的形参
        };
    };
};
```

​        `task_func`是我们定义的一个任务的回调函数类型,第一个参数当然就是让哪个loop机制去执行这个task任务。很明显,一个loop是对应一个thread线程的。也就是让哪个thread去执行这个task任务。args是`task_func`的函数形参。

​    

5.LarV0.12链接参数属性的绑定

### 15.2 event_loop模块添加task任务机制

​        我们知道,task绑定一个loop,很明显,一个`event_loop`应该拥有需要被执行的task集合。

​         在这里,我们将event_loop加上已经就绪的task任务的属性

> lars_reactor/include/event_loop.h

```c
#pragma once
/*
 *
 * event_loop事件处理机制
 *
 * */
#include <sys/epoll.h>
#include <ext/hash_map>
#include <ext/hash_set>
#include <vector>
#include "event_base.h"

#include "task_msg.h"

#define MAXEVENTS 10

// map: fd->io_event 
typedef __gnu_cxx::hash_map<int, io_event> io_event_map;
//定义指向上面map类型的迭代器
typedef __gnu_cxx::hash_map<int, io_event>::iterator io_event_map_it;
//全部正在监听的fd集合
typedef __gnu_cxx::hash_set<int> listen_fd_set;

//定义异步任务回调函数类型
typedef void (*task_func)(event_loop *loop, void *args);

class event_loop 
{
public:
    //构造,初始化epoll堆
    event_loop();

    //阻塞循环处理事件
    void event_process();

    //添加一个io事件到loop中
    void add_io_event(int fd, io_callback *proc, int mask, void *args=NULL);

    //删除一个io事件从loop中
    void del_io_event(int fd);

    //删除一个io事件的EPOLLIN/EPOLLOUT
    void del_io_event(int fd, int mask);

    // ===========================================
    //获取全部监听事件的fd集合
    void get_listen_fds(listen_fd_set &fds) {
        fds = listen_fds;
    }

    //=== 异步任务task模块需要的方法 ===
    //添加一个任务task到ready_tasks集合中
    void add_task(task_func func, void *args);
    //执行全部的ready_tasks里面的任务
    void execute_ready_tasks();
    // ===========================================
private:
    int _epfd; //epoll fd

    //当前event_loop 监控的fd和对应事件的关系
    io_event_map _io_evs;

    //当前event_loop 一共哪些fd在监听
    listen_fd_set listen_fds;

    //一次性最大处理的事件
    struct epoll_event _fired_evs[MAXEVENTS];

        // ===========================================
    //需要被执行的task集合
    typedef std::pair<task_func, void*> task_func_pair;
    std::vector<task_func_pair> _ready_tasks;
    // ===========================================
};
```

添加了两个属性:

`task_func_pair`: 回调函数和参数的键值对.

`_ready_tasks`: 所有已经就绪的待执行的任务集合。



同时添加了两个主要方法:

`void add_task(task_func func, void *args)`: 添加一个任务到_ready_tasks中.

`void execute_ready_tasks()`:执行全部的_ready_tasks任务。

将这两个方法实现如下:

> lars_reactor/src/event_loop.cpp

```c
//...

//添加一个任务task到ready_tasks集合中
void event_loop::add_task(task_func func, void *args)
{
    task_func_pair func_pair(func, args);
    _ready_tasks.push_back(func_pair);
}

//执行全部的ready_tasks里面的任务
void event_loop::execute_ready_tasks()
{
    std::vector<task_func_pair>::iterator it;

    for (it = _ready_tasks.begin(); it != _ready_tasks.end(); it++) {
        task_func func = it->first;//任务回调函数
        void *args = it->second;//回调函数形参

        //执行任务
        func(this, args);
    }

    //全部执行完毕,清空当前的_ready_tasks
    _ready_tasks.clear();
}

//...
```

相关文章:

  • AI优化SEO关键词实践路径
  • 扩散模型:了解ai生图的原理
  • (38)VTK C++开发示例 ---纹理裁剪
  • 如何提升个人的理解能力?
  • 【中间件】brpc_基础_execution_queue
  • HuggingFace常用加载模型方法
  • RabbitMQ 中的六大工作模式介绍与使用
  • Spring IoC 注解式开发全解析
  • linux netlink实现用户态和内核态数据交互
  • 什么是多租户系统
  • 实战探讨:为什么 Redis Zset 选择跳表?
  • UDP / TCP 协议
  • 机器学习入门-线性回归模型/损失函数/梯度下降
  • Java SE(7)——类和对象(二)
  • 深入解析 SqlSugar 与泛型封装:实现通用数据访问层
  • 4.29-4.30 Maven+单元测试
  • Spring MVC @RequestBody 注解怎么用?接收什么格式的数据?
  • 数据赋能(208)——质量管理——及时性原则
  • 单细胞测序数据分析流程的最佳实践
  • 旋转矩阵公式理解
  • 刘诚宇、杨皓宇进球背后,是申花本土球员带着外援踢的无奈
  • 上海虹桥机场至北京首都机场快线试运行跨航司自愿签转服务
  • 解放军仪仗司礼大队仪仗分队参加纪念苏联伟大卫国战争胜利80周年阅兵活动
  • 李强签署国务院令,公布修订后的《中华人民共和国植物新品种保护条例》
  • 2025年第一批“闯中人”已经准备好了
  • 国铁集团去年收入12830亿元增3%,全年铁路运输利润总额创新高