当前位置: 首页 > news >正文

Docker安装Arroyo流处理引擎

介绍

Arroyo 是一个用 Rust 编写的分布式流处理引擎,旨在高效地对数据流执行有状态计算,有着与Apache Flink、Spanrk Streaming和Kafka Streams相同的实时流计算能力。与传统的批处理处理器不同,流式处理引擎可以运行 在有界和无界源上,一旦结果可用,就立即发出结果。
  • Arroyo 可以在本机读取和写入 JSON、Avro、Parquet 以及原始文本和二进制文件。自定义格式可以使用 UDF 实现。
  • 通过编写 Rust 用户定义的标量、聚合和异步函数来扩展内置 SQL,Python 即将推出。
  • 通过强大的 Arroyo Web UI 管理连接、开发和测试 SQL 查询以及监控管道。
  • 可以使用 REST API 创建、作和管理管道,从而大规模提供声明式编排。
  • Arroyo 随附大量连接器,可轻松集成到您的数据堆栈中
简而言之:Arroyo 允许您以亚秒级结果对大量实时数据提出复杂的问题。
Arroyo 可以自托管,也可以通过 Arroyo Systems 管理的 Arroyo Cloud 服务使用。

特征

  • 在 SQL 中定义的管道,支持复杂的分析查询
  • 可扩展至每秒数百万个事件
  • 有状态作,如 windows 和 joins
  • 用于容错和管道恢复的状态检查点
  • 支持水印的事件时间处理

Docker安装

docker run --rm \--name arroyo \-p 5115:5115 \ghcr.io/arroyosystems/arroyo:latest

默认安装相关配置在~/.config/arroyo目录下设置

Portainer平台安装

在Pull image》Image中输入: ghcr.io/arroyosystems/arroyo:latest,点击"Pull the image"拉取仓库镜像文件;稍等片刻拉取完毕后,Images列表中将会显示镜像信息;
确认镜像已拉取完毕,同样左侧Containers菜单中进入到容器管理面页,点击"Add container"进入添加容器配置界面;
设置docker容器映射端口,Port mapping设置如下:
  • 5115 =》5115:默认访问web ui的http端口;
Arroyo支持TOML或YAML配置,同时也支持将选项以环境变量的方式进行配置,相关配置参考官方: https://doc.arroyo.dev
在Runtime&resources中分配docker容器运行所需要的cpu、内存、内存交接空间等,此处根据实际硬件条件分配即可;
完成上述配置后,点击Deploy the container按钮发布docker容器并启动Arroyo服务,稍等片刻没有错误消息提示,即容器运行正常服务启动成功,如有错误提示,可在Containers列表中点击容器日志进行排查;

Arroyo功能概念

Arroyo提供了可视化的Web UI, 通过REST API与系统交互的Web应用程序。 它允许用户配置系统、创建管道并监控其状态。
Arroyo 的窗口函数实现基于 Apache DataFusion,因此这些窗口函数执行计算派生自DataFusion函数引用。
Arroyo依靠配置数据库来存储已注册的表和管道,并为API和Web UI查询提供注册数据支持。官方支持Postgres和本地默认安装的Sqlite,根据不同的实施规模,具有不同的可伸缩性和可靠性权衡:
  • Postgres 是大规模和分布式集群的推荐选择
  • 建议在本地运行和管道集群时使用 Sqlite
Arroyo支持将流处理检查快照存储到本地文件目录和远程对象存储服务,以便在发生故障时,可以从存储的快照检查点恢复计算数据和流管道,以及在需要时对数据做重新处理;
通常本地文件目录用于Arroyo单机部署后的开发与测试使用,生产环境部署建议使用远程对象存储,如:s3(Amazon)、GCS (Google Cloud)、 和 ABS (Azure),以及 Minio、Cloudflare R2 和 Backblaze B2.
Arroyo会使用匿名的方式采集服务运行的遥测数据,来了解系统的使用情况并帮助确定未来的优先级发展。可以关闭该数据的采集,在运行Arroyo服务时设置DISABLE_TELEMETRY=true,或设置环境变量ARROYO__DISABLE_TELEMETRY=true;
Arroyo基于事件流执行窗口计算逻辑,目前 Arroyo 支持三种类型的窗口:翻滚、滑动(又名跳跃)、会话窗口。
  • TUMBLE():滚动窗口是具有固定大小的连续、不重叠的窗口,每次开窗后在固定时长或大小内获取数据重新流处理,如:开窗10分钟后,重新开窗下一个10分钟。
  • HOP():滑动窗口是滚动窗口的扩展,增加了 “滑动”概念,即按固定时间间隔向后滑动,如:开窗10分钟,每次向后滑动1分钟,重新对10分钟内流事件处理(包含上一个窗口时间重叠部份数据)。
  • SESSION():会话窗口是非固定宽度的窗口,由用户的活动事件创建,如:会话窗口30分钟,获取用户流事件30分钟后,会话窗口关闭,并从下次会话流事件开始打开新的30分钟会话窗口。

访问WebUI

部署启动后通过浏览器访问: http://localhost:5115

流处理示例

通常在使用流处理的场景中,将源源不断的实时数据发送到消息队列中,进行解耦处理,后端应用服务通过读取队列数据进行业务计算与格式化后存储;因此Kafka做为消息队列服务中主流的开源中间件,在大量企业中实际应用,Kafka有着强大性能与稳定性,支持高并发、高吞吐,经过了多年的发展与壮大,已经成为互联网行业的必备消息队列产品,也得到了极大的赞誉;
本示例以向Kafka消息服务发送自定义实时测试数据,在Arroyo上通过SQL语法创建数据表和连接Kafka的管道,通过SQL语句将消息数据从Kafka拉取后,再发送到时序数据库QuestDB中进行存储与分析;
基本流程: 消息 =》 Kafka =》 Arroyo =》 QuestDB

1、创建Kafka数据

构造用于测试的Kafka消息结构示例:
{"create_time":"2025-06-24 20:30:09.903","event":"login","game_id":"笑傲江湖","role":"无","server_id":"s45","uid":148952381}

实时推送测试数据到Kafka是由独立的运行程序执行生成,自已写个Demo代码不难,此处不做详述。

2、创建时序数据表

QuestDB是专为SQL构建的高性能时间序列数据库,支持做为Arroyo流处理数据的实时分析接收器;QuestDB入门参考:《 QuestDB时序数据库快速入门-CSDN博客》
在QuestDB时序数据库中创建以下表。
--删除表
drop table game_event_table;
--创建表
CREATE TABLE game_event_table (  uid LONG,  game_id SYMBOL,role STRING,server_id STRING, event SYMBOL, create_time TIMESTAMP
) TIMESTAMP(create_time) PARTITION BY MONTH;
-- 查询
select * from game_event_table;

3、创建流处理程序

在Arroyo中创建流处理SQL,每5秒将滚动窗口数据推送到QuestDB。
QuestDB支持InfluxDB线路协议,公开了一个可接受ILP数据格式HTTP端点"http://localhost:9000/write";
通过Arroyo内置的webhook组件,将流处理后的数据重新转换成ILP格式,输出到QuestDB时序数据库中。
-- {"create_time":"2025-06-24 20:30:09.903","event":"login","game_id":"笑傲江湖","role":"无","server_id":"s45","uid":148952381}
-- WATERMARK为数据流事件的水印,水印用于事件时间固定偏移量,用来解决数据流到达窗口后的延迟问题
CREATE TABLE game_event_source (create_time TIMESTAMP NOT NULL,  uid BIGINT,event VARCHAR,game_id VARCHAR,role VARCHAR,server_id VARCHAR,WATERMARK FOR create_time AS create_time - INTERVAL '5 seconds'
) WITH (connector = 'kafka',format = 'json',bootstrap_servers = '192.168.1.5:9092',topic = 'game_behavior_arroyo',type = 'source','source.offset' = 'earliest','source.read_mode' = 'read_committed'
);-- 创建输出表(通过webhook推送http请求到questDB时序数据库的/write端点)
CREATE TABLE game_event_sink (value TEXT
) WITH (connector = 'webhook',endpoint = 'http://192.168.1.3:9000/write',format = 'raw_string'
);-- 将Kafka输入源数据读取通过后输出到game_event_sink表(通过webhook服务发送http ILP格式数据到questDb时序数据库)
INSERT INTO game_event_sink
SELECTARRAY_TO_STRING(ARRAY_AGG(CONCAT('game_event_table,','game_id=', game_id, ',','event=', event, ' ','uid=', uid, 'i,','role="', role, '",','server_id="', server_id, '" ',CAST(to_timestamp_nanos(create_time) AS BIGINT))),CHR(10)) AS value
FROM game_event_source
GROUP BY TUMBLE(INTERVAL '5 SECONDS');

4、发布与执行流处理程序

WebUI界面操作效果如下:
Check:对Query查询窗口中的SQL进行预加载,并生成流执行步骤与管道节点,通过可视化节点,观察是否异常
Priview:对Query查询窗口中的SQL进行预览试运行,此时并未正式发布,通过短暂运行检验SQL语法或管道是否存在错误,以便做纠正;
Launch:在Query查询窗口执行Priview无误后,即可点击进行发布,并正式调度运行;
在Launch后,回到Pipelines中查看到前流处理任务的执行状态与运行时间,默认Launch后,无法通过界面重新编辑SQL脚本,如需修改只能进入到流处理任务详情页,通过Stop按钮停止任务后,在回到Pipelines中删除流处理任务,再重新创建;

5、QuestDB查看数据

在QuestDB中通过查询game_event_table表记录,显示已接收到Arroyo通过webhook推送的数据;

参考

https://doc.arroyo.dev

https://questdb.com/blog/arroyo-to-questdb/

相关文章:

  • 17网做网站seo推广软件
  • 企业商城建站百度免费推广登录入口
  • 网站需要怎么做厦门人才网最新招聘信息
  • 网站显示已备案微信公众号怎么开通
  • 内蒙古创意星空网站开发网上售卖平台有哪些
  • 购买一个网站需要多少钱?关联词有哪些
  • Deepin Linux如何安装Terminus终端教程
  • RAG入门课程-学习笔记
  • 矩阵题解——螺旋矩阵【LeetCode】
  • Class00.2线性代数
  • rules写成动态
  • 解决npm安装依赖报错ERESOLVE unable to resolve dependency tree
  • excel中vba开发工具
  • C语言基础回顾与Objective-C核心类型详解
  • 15个AI模拟面试平台 和 简历修改 / 真人面试平台
  • 【服务器】服务器选型设计
  • windows 怎么下载yarn安装包并将下载的yarn文件移动到全局目录并添加执行权限?
  • 防火墙快速管理软件,66K超小巧
  • 数据文件写入技术详解:从CSV到Excel的ETL流程优化
  • 批量删除 word文档里面多个相同表格的特定行
  • 博图SCL语言中 RETURN 语句使用详解
  • 【项目】仿muduo库one thread one loop式并发服务器HTTP协议模块
  • 新能源知识库(67)高温热泵在电镀领域的应用
  • LVS-NAT负载均衡群集实战:原理、部署与问题排查
  • Django的CSRF保护机制
  • 工业“三体”联盟:ethernet ip主转profinet网关重塑设备新规则