当前位置: 首页 > news >正文

workflow:高效的流式工作架构

引言

workflow是sougou的一款开源框架 主要是以请求回应的模式解决各自网络/IO任务而发明的

 一.workflow的任务流
1.workflow都封装了哪些任务流

以请求回应的模式来解释

① 网络层

服务端

在服务端的request 相当于发送了一个获取客户端请求的请求,response相当于接收客户端的请求从而处理请求

客户端 

在客户端的request相当于向服务端发送了一个请求,response相当于接收服务端回复的消息

 ② 定时器

发送延时请求---->处理延时任务

③  IO层

发起处理文件IO的请求---->处理文件

④  cpu

发起耗时计算的请求---->处理耗时任务

总结:

耗时等待 :负载均衡 类似于fd通过%4取模的方式负载均衡给四个网络线程 ,当条件满足的时候把任务抛出给工作线程处理

 

耗时计算:当遇到计算任务的时候工作线程会把计算任务抛出给到go线程,go线程池通过任务调度处理计算任务

2.任务是如何来进行组织的呢

在workflow中通常以串联,并联,DAG 三种方式对任务进行组织

①串联

想象成你中学学习物理的时候的串联电路

②并联

想象成你中学学习物理的时候的并联电路

③DAG

在工作流(Workflow)系统中,DAG(有向无环图,Directed Acyclic Graph) 是一个非常核心的概念,它用于表示任务之间的执行依赖关系,画一个简单的图演示一下。

 

任务A 最先执行

任务B 和 任务C 并行执行,但都依赖于 任务A

任务D 要等 任务B 和 任务C 都完成后才执行

学过408中的操作系统信号量的同学应该可以很轻松理解这个模型

 

二.线程模型

main函数入口主线程

4个网络线程

20个工作线程

8个计算线程

 

 三.workflow的三板斧
1.三板斧是哪三板斧

first-->抽象粒度合适的异步任务

second--->通过任务流组织任务:串联,并联,以及DAG

third--->协调任务:counter,conditional,resource pool,message queue

2.message queue 

我们简单介绍一下message queue 消息队列的部分

队列的接口层

 在workflow中消息队列提供了上述这些接口 分别是 队列的创建 ,获取队列 ,往队列里面put 消息  ,设置队列是阻塞还是非阻塞 ,队列的销毁 

队列都有哪些成员

 此队列中有两个队列 分别是put队列 和 get队列 我们了解这两个队列的同时 我们要先理解多消费者和多生成者模型 , 在生产者往队列里面put消息的时候 是通过在put队列里面put操作 ,当消费者消费队列中的消息的时候 通过在get队列里面使用get操作,当get队列里面没有数据的时候,put队列里面的数据会转移到get队列里面供消费者消费,我们这样设计的目的是为了减少生产者和消费者之间的碰撞从而提供效率

 我们从中拿一个消息队列的头插法举个例子

 我们用void** 接收一个偏移后的message 用C语音的void**的好处是可以接收任何类型的数据,更加自由的去操作数据

我们对消息队列进行put操作前 我们要保证原子性 所以我们要通过pthread_mutex_lock 上锁 和pthread_mutex_unloc解锁

 后面我们通过条件变量保证数据的同步性pthread_cond_wait

 3.我们举一个简单的用workflow写的一个http请求回应代码
#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
#include <workflow/WFTaskFactory.h>
#include <workflow/WFFacilities.h>static WFFacilities::WaitGroup waitGroup(1);  // 用于主线程等待任务完成void http_callback(WFHttpTask *task)
{int state = task->get_state();int error = task->get_error();if (state != WFT_STATE_SUCCESS){printf("HTTP request failed. State = %d, Error = %d\n", state, error);waitGroup.done();  // 通知主线程任务已完成return;}const void *body;size_t body_len;task->get_resp()->get_parsed_body(&body, &body_len);printf("HTTP Response:\n%.*s\n", (int)body_len, (const char *)body);waitGroup.done();  // 通知主线程任务已完成
}int main()
{signal(SIGPIPE, SIG_IGN);  // 忽略 SIGPIPE 信号const char *url = "http://www.github.com/";WFHttpTask *task = WFTaskFactory::create_http_task(url, 4, 2, http_callback);task->start();  // 异步启动任务waitGroup.wait();  // 主线程等待任务完成return 0;
}

我们把一个url 放入到一个任务流中 并设置回调函数 

任务流对github发送request github收到请求 并 回复response给我们 触发回调

打印http://www.example.com 的 HTTP 响应内容。

相关文章:

  • day31python打卡
  • c++使用protocol buffers
  • DeepSeek R2 或将发布,压力给到梁文锋
  • MySQL中添加一个具有创建数据库权限的用户
  • layui 介绍
  • SpringBean模块(三)具有生命周期管理能力的类(1)AutowireCapableBeanFactory
  • JAVA基础——方法和类型转换
  • 操作系统————四种动态分区分配算法详解(首次适应,最坏适应,最佳适应,邻近适应)
  • 【Java高阶面经:微服务篇】3.熔断机制深度优化:从抖动治理到微服务高可用架构实战
  • Python Day27 学习
  • Linux僵死进程以及文件操作
  • LW-CTrans:一种用于三维医学图像分割的轻量级CNN与Transformer混合网络|文献速递-深度学习医疗AI最新文献
  • RFID技术在半导体晶圆卡塞盒中的应用方案
  • 计算机可以深度结合、并且很有发展空间的领域
  • 机器学习 day05
  • Git客户端安装、操作
  • Vue3 中使用 provide/inject 实现跨层级组件传值失败的原因及解决方案
  • Vue之入门(Vue是什么以及Vue工作原理)
  • 数据要素如何重构人力资本升级
  • 消息传递--树形dp--50?!
  • 西安市长安区与航天基地区政合一管理,党政一把手分任基地党工委正副书记
  • 上海合作组织减贫和可持续发展论坛开幕,沈跃跃宣读习近平主席贺信
  • 国际观察丨美中东政策生变,以色列面临艰难选择
  • 美国考虑让移民上真人秀竞逐公民权,制片人称非现实版《饥饿游戏》
  • 官方通报汕头违建豪宅“英之园”将强拆:对有关人员严肃追责问责
  • 车主质疑零跑汽车撞车后AEB未触发、气囊未弹出,4S店:其把油门当刹车