AimRT 从零到一:官方示例精讲 —— 三、Executor示例.md
Executor示例
官方仓库:executor
配置文件(configuration_executor.yaml
)
依据官方示例项目结构自行编写YAML配置文件:
# 基础信息
base_info:project_name: Logger # 项目名称build_mode_tags: ["EXAMPLE", "SIMULATION", "TEST_CAMERA"] # 构建模式标签aimrt_import_options: # AimRT框架的构建选项AIMRT_BUILD_TESTS: "OFF" # 是否构建测试代码AIMRT_BUILD_EXAMPLES: "ON" # 是否构建示例代码AIMRT_BUILD_DOCUMENT: "OFF" # 是否构建文档AIMRT_BUILD_RUNTIME: "ON" # 是否构建运行时核心AIMRT_BUILD_CLI_TOOLS: "OFF" # 是否构建命令行工具AIMRT_BUILD_WITH_PROTOBUF: "ON" # 是否启用Protobuf支持AIMRT_USE_LOCAL_PROTOC_COMPILER: "OFF" # 是否使用本地protoc编译器AIMRT_BUILD_WITH_ROS2: "OFF" # 是否集成ROS2支持AIMRT_BUILD_NET_PLUGIN: "OFF" # 是否构建网络插件AIMRT_BUILD_ROS2_PLUGIN: "OFF" # 是否构建ROS2插件# 模块
modules:- name: executor_module # 最基本的 cpp executor 示例- name: executor_co_module # 基于协程接口使用 executor 功能的示例- name: executor_co_loop_module # 基于协程接口使用 executor 功能实现定时循环的示例- name: real_time_module # executor 实时性相关功能的示例# pkg
pkgs:- name: executor_pkg # 包名modules:- name: executor_module- name: executor_co_module- name: executor_co_loop_module- name: real_time_module# 部署
deploy_modes:- name: local_deploy # 部署模式名称deploy_ins: # 部署实例- name: local_ins_executor # 实例名称pkgs:- name: executor_pkg # 实例加载的包
运行aimrt_clt
工具生成脚手架代码
aimrt_cli gen -p configuration_executor.yaml -o executor
module目录
executor_module
一个最基本的 cpp executor 示例,演示内容包括:
- 如何获取执行器;
- 如何投递任务到执行器中执行;
- 如何使用线程安全型执行器;
- 如何投递定时任务到执行器中;
说明:
-
此示例创建了一个
ExecutorModule
,会在Initialize
时获取以下三种执行器:- 名称为
work_executor
的普通执行器; - 名称为
thread_safe_executor
的线程安全型执行器; - 名称为
time_schedule_executor
的支持定时任务的执行器;
- 名称为
-
ExecutorModule
模块在Start
阶段会依次使用获取的执行器运行具体的逻辑任务:-
work_executor
:投递一个简单的任务到其中执行; -
thread_safe_executor
:向该执行器中一次性投递 10000 个任务,用于递增一个整数 n,并在最终打印出 n 的值,由于执行器是线程安全,故 n 最终值还是 10000; -
time_schedule_executor
:通过该执行器实现了一个间隔 1s 的定时循环;
-
-
此示例将
ExecutorModule
集成到executor_pkg
中,并在配置文件中加载此 Pkg;
模块定义(executor_module.h
)
#pragma once#include "aimrt_module_cpp_interface/module_base.h"namespace Executor::executor_module {// 执行器模块类,继承自模块基类
class ExecutorModule : public aimrt::ModuleBase {public:ExecutorModule() = default;~ExecutorModule() override = default;// 获取模块信息aimrt::ModuleInfo Info() const override {return aimrt::ModuleInfo{.name = "ExecutorModule"};} // 复制官方代码过来后需要添加命名空间 aimrt// 初始化模块bool Initialize(aimrt::CoreRef aimrt_ptr) override;// 启动模块bool Start() override;// 关闭模块void Shutdown() override;private:// 获取日志记录器auto GetLogger() { return core_.GetLogger(); }// 简单执行演示void SimpleExecuteDemo();// 线程安全演示void ThreadSafeDemo();// 时间调度演示void TimeScheduleDemo();private:aimrt::CoreRef core_; // AIMRT框架核心引用aimrt::executor::ExecutorRef work_executor_; // 普通执行器aimrt::executor::ExecutorRef thread_safe_executor_; // 线程安全执行器std::atomic_bool run_flag_ = true; // 运行标志(原子变量)uint32_t loop_count_ = 0; // 循环计数器aimrt::executor::ExecutorRef time_schedule_executor_; // 时间调度执行器
};} // namespace Executor::executor_module
这里主要是定义了几个AimRT的执行器句柄和会用到的变量
模块实现(executor_module.cc
)
初始化阶段
// 初始化模块
bool ExecutorModule::Initialize(aimrt::CoreRef core) {// 保存AIMRT框架句柄core_ = core;// 获取工作执行器work_executor_ = core_.GetExecutorManager().GetExecutor("work_executor");AIMRT_CHECK_ERROR_THROW(work_executor_, "无法获取工作执行器(work_executor)");// 获取线程安全执行器thread_safe_executor_ =core_.GetExecutorManager().GetExecutor("thread_safe_executor");AIMRT_CHECK_ERROR_THROW(thread_safe_executor_ && thread_safe_executor_.ThreadSafe(),"无法获取线程安全执行器(thread_safe_executor)");// 获取时间调度执行器time_schedule_executor_ =core_.GetExecutorManager().GetExecutor("time_schedule_executor");AIMRT_CHECK_ERROR_THROW(time_schedule_executor_ && time_schedule_executor_.SupportTimerSchedule(),"无法获取时间调度执行器(time_schedule_executor)");AIMRT_INFO("初始化成功");return true;
}
主要操作是根据名称获取执行器,执行器名称会在启动时由config.yaml文件确定,例如:
executor:executors:- name: work_executor # 执行器名称type: asio_thread # 执行器类型:基于 ASIO 的线程池,options:thread_num: 2 # 分配的线程数量,用于并发处理任务- name: thread_safe_executor # 线程安全执行器:通过 ASIO Strand 实现串行执行,避免竞态条件type: asio_strand # 执行器类型:基于 ASIO 的 strand 包装,确保线程安全options:bind_asio_thread_executor_name: work_executor # 绑定的基础执行器,实际任务由其线程池调度执行- name: time_schedule_executor # 定时任务执行器:用于周期性/定时调度任务type: asio_thread # 执行器类型:使用独立的 ASIO 线程池执行定时任务options:thread_num: 2 # 分配的线程数量,可根据定时任务复杂度调整
运行阶段
// 启动模块
bool ExecutorModule::Start() {// 测试简单执行SimpleExecuteDemo();// 测试线程安全执行ThreadSafeDemo();// 测试时间调度执行TimeScheduleDemo();AIMRT_INFO("启动成功");return true;
}// 任务函数实现// 简单执行演示
void ExecutorModule::SimpleExecuteDemo() {work_executor_.Execute([this]() { AIMRT_INFO("这是一个简单任务"); });
}// 线程安全演示
void ExecutorModule::ThreadSafeDemo() {uint32_t n = 0;for (uint32_t ii = 0; ii < 10000; ++ii) {thread_safe_executor_.Execute([&n]() { n++; }); // 线程安全执行器// work_executor_.Execute([&n]() { n++; }); // 普通执行器}std::this_thread::sleep_for(std::chrono::seconds(1)); // 等待1秒AIMRT_INFO("n的值为: {}", n);
}// 时间调度演示
void ExecutorModule::TimeScheduleDemo() {if (!run_flag_) return; // 检查运行标志AIMRT_INFO("循环计数: {}", loop_count_++);// 1秒后再次执行本函数time_schedule_executor_.ExecuteAfter(std::chrono::seconds(1),std::bind(&ExecutorModule::TimeScheduleDemo, this));
}
通过调用执行器句柄的Execute
或者ExecuteAfter
向执行器中投递任务
停止阶段
// 关闭模块
void ExecutorModule::Shutdown() {run_flag_ = false; // 设置运行标志为falsestd::this_thread::sleep_for(std::chrono::seconds(1)); // 等待1秒AIMRT_INFO("关闭成功");
}
将run_flag_
变量置false,用于控制定时器执行器的任务停止。
对应的启动配置文件
# Copyright (c) 2023, AgiBot Inc.
# All rights reserved.aimrt:log:core_lvl: INFO # Trace/Debug/Info/Warn/Error/Fatal/Offbackends:- type: consoleexecutor:executors:- name: work_executortype: asio_threadoptions:thread_num: 2- name: thread_safe_executortype: asio_strandoptions:bind_asio_thread_executor_name: work_executor- name: time_schedule_executortype: asio_threadoptions:thread_num: 2module:pkgs:- path: ./libexecutor_pkg.soenable_modules: [ExecutorModule]modules:- name: ExecutorModulelog_lvl: INFO
executor_co_module
一个基于协程接口使用 executor 功能的示例,演示内容包括:
- 如何以协程的方式使用执行器;
模块定义(executor_co_module.h
)
#pragma once#include "aimrt_module_cpp_interface/co/aimrt_context.h"
#include "aimrt_module_cpp_interface/co/async_scope.h"
#include "aimrt_module_cpp_interface/co/task.h"
#include "aimrt_module_cpp_interface/module_base.h"namespace Executor::executor_co_module {using namespace aimrt;// 定义一个协程模块,继承自 AimRT 的 ModuleBase 接口
class ExecutorCoModule : public aimrt::ModuleBase {public:ExecutorCoModule() = default;~ExecutorCoModule() override = default;// 返回模块信息aimrt::ModuleInfo Info() const override {return ModuleInfo{.name = "ExecutorCoModule"};}// 初始化模块bool Initialize(aimrt::CoreRef aimrt_ptr) override;// 启动模块,执行实际的协程逻辑bool Start() override;// 模块关闭逻辑,清理资源void Shutdown() override;private:// 获取日志器auto GetLogger() { return core_.GetLogger(); }// 简单执行器示例:演示普通线程池调度co::Task<void> SimpleExecuteDemo();// 线程安全执行器示例:演示多个任务安全执行co::Task<void> ThreadSafeDemo();// 定时执行器示例:定时打印计数co::Task<void> TimeScheduleDemo();private:aimrt::CoreRef core_; // 框架核心引用co::AimRTContextctx_; // 协程上下文对象,可以用来获取某个执行器的"协程封装接口"co::AsyncScopescope_; // 协程作用域,用于启动协程(spawn)和等待协程完成(complete),可以跟踪和管理多个异步协程的生命周期std::atomic_bool run_flag_ = true; // 控制定时任务运行的标志位
};} // namespace Executor::executor_co_module
模块实现(executor_co_module.cc
)
初始化阶段
bool ExecutorCoModule::Initialize(aimrt::CoreRef core) {// 保存框架核心引用core_ = core;// 创建协程上下文对象ctx_ = co::AimRTContext(core_.GetExecutorManager());// 检查普通执行器是否可用auto work_executor = core_.GetExecutorManager().GetExecutor("work_executor");AIMRT_CHECK_ERROR_THROW(work_executor, "无法获取 work_executor 执行器");// 检查线程安全执行器是否可用auto thread_safe_executor =core_.GetExecutorManager().GetExecutor("thread_safe_executor");AIMRT_CHECK_ERROR_THROW(thread_safe_executor && thread_safe_executor.ThreadSafe(),"无法获取 thread_safe_executor 执行器或不支持线程安全");// 检查定时执行器是否可用auto time_schedule_executor =core_.GetExecutorManager().GetExecutor("time_schedule_executor");AIMRT_CHECK_ERROR_THROW(time_schedule_executor &&time_schedule_executor.SupportTimerSchedule(), "无法获取time_schedule_executor 执行器或不支持定时调度");// 初始化成功AIMRT_INFO("模块初始化成功。");return true;
}
- 注意这里获取的三个执行器,只是用来检查是否可用,将错误限制在初始化阶段
运行阶段
bool ExecutorCoModule::Start() {// 启动普通协程任务scope_.spawn(co::On(co::InlineScheduler(), SimpleExecuteDemo()));// 启动线程安全的协程任务scope_.spawn(co::On(co::InlineScheduler(), ThreadSafeDemo()));// 启动定时执行的协程任务scope_.spawn(co::On(co::InlineScheduler(), TimeScheduleDemo()));AIMRT_INFO("模块启动成功。");return true;
}
调用三个任务函数
-
scope_.spawn(...)
用于向协程scope_
这个协程作用域中启动一个协程任务 -
co::InlineScheduler()
指的是在当前线程执行协程的第一段代码 -
co::On(scheduler, SimpleExecuteDemo())
表示让SimpleExecuteDemo()
协程在scheduler
上开始执行
co::Task<void> ExecutorCoModule::SimpleExecuteDemo() {// 获取 work_executor 的调度器auto work_scheduler = ctx_.GetScheduler("work_executor");// 打印当前协程的线程 IDstd::ostringstream oss1;oss1 << std::this_thread::get_id();std::cout << "[SimpleExecuteDemo] 当前协程的线程 ID: " << oss1.str()<< std::endl;// 将当前协程调度到 work_executor 上执行co_await co::Schedule(work_scheduler);// 打印切换后的线程 IDstd::ostringstream oss2;oss2 << std::this_thread::get_id();std::cout << "[SimpleExecuteDemo] 切换执行器后的线程 ID: " << oss2.str()<< std::endl;// 执行任务内容AIMRT_INFO("执行普通任务:SimpleExecuteDemo");co_return;
}
-
ctx_.GetScheduler("work_executor")
通过执行器名称获取协程封装版本的调度器 -
co_await co::Schedule(work_scheduler)
挂起当前线程,后续让work_scheduler
异步执行后续代码 - 可以简单添加两个日志打印,观察不同线程执行协程函数的切换效果
co::Task<void> ExecutorCoModule::ThreadSafeDemo() {// 获取 thread_safe_executor 的调度器auto thread_safe_scheduler = ctx_.GetScheduler("thread_safe_executor");co::AsyncScope scope;uint32_t n = 0; // 计数器auto task = [&n]() -> co::Task<void> {n++;co_return;};// 启动 10000 个线程安全任务for (size_t ii = 0; ii < 10000; ++ii) {scope.spawn(co::On(thread_safe_scheduler, task()));}// 等待所有任务执行完毕co_await co::On(co::InlineScheduler(), scope.complete());// 输出最终计数值AIMRT_INFO("线程安全任务已完成,n 的值为 {}", n);co_return;
}
-
这里的区别是协程内部再启动了10000个异步的协程
-
co::On(co::InlineScheduler(), ...)
让恢复操作 在当前线程(InlineScheduler)中执行
co::Task<void> ExecutorCoModule::TimeScheduleDemo() {AIMRT_INFO("开始定时任务循环。");// 获取 time_schedule_executor 的调度器auto time_scheduler = ctx_.GetScheduler("time_schedule_executor");// 切换协程到定时调度线程池co_await co::Schedule(time_scheduler);uint32_t count = 0;while (run_flag_) {count++;AIMRT_INFO("定时循环第 {} 次 -------------------------", count);// 延迟 1 秒后再次调度(实现定时循环)co_await co::ScheduleAfter(time_scheduler, std::chrono::seconds(1));}AIMRT_INFO("退出定时任务循环。");co_return;
}
-
co_await co::ScheduleAfter(time_scheduler,...)
让等待任务完成后,由time_scheduler
执行器中的线程恢复
停止阶段
void ExecutorCoModule::Shutdown() {// 停止定时任务循环run_flag_ = false;// 等待所有协程任务完成co::SyncWait(scope_.complete());AIMRT_INFO("模块已关闭。");
}
- 停止定时任务
-
co::SyncWait(scope_.complete())
阻塞当前线程,直到scope_.complete()
所控制的所有协程任务完成
对应的启动配置文件
# Copyright (c) 2023, AgiBot Inc.
# All rights reserved.aimrt:log:core_lvl: INFO # Trace/Debug/Info/Warn/Error/Fatal/Offbackends:- type: consoleexecutor:executors:- name: work_executortype: asio_threadoptions:thread_num: 2- name: thread_safe_executortype: asio_threadoptions:thread_num: 1- name: time_schedule_executortype: asio_threadoptions:thread_num: 2module:pkgs:- path: ./libexecutor_pkg.soenable_modules: [ExecutorCoModule]modules:- name: ExecutorCoModulelog_lvl: INFO
executor_co_loop_module
一个基于协程接口使用 executor 功能实现定时循环的示例,演示内容包括:
- 如何以协程的方式使用执行器实现定时循环;
与上一个示例中的定时器类似,因此这里只简单地分析。
模块定义(executor_co_loop_module.h
)
#pragma once// 引入原子操作支持,用于线程安全的布尔变量
#include <atomic>// 引入 AimRT 协程相关的接口
#include "aimrt_module_cpp_interface/co/aimrt_context.h"
#include "aimrt_module_cpp_interface/co/async_scope.h"
#include "aimrt_module_cpp_interface/co/task.h"#include "aimrt_module_cpp_interface/module_base.h"// 所有代码都放在对应命名空间下,防止命名冲突
namespace Executor::executor_co_loop_module {using namespace aimrt;// 自定义模块类,继承自 AimRT 框架的 ModuleBase 基类
class ExecutorCoLoopModule : public aimrt::ModuleBase {public:// 构造函数:默认构造ExecutorCoLoopModule() = default;// 析构函数:使用默认析构~ExecutorCoLoopModule() override = default;// 模块信息,用于注册模块时描述其名称ModuleInfo Info() const override {return ModuleInfo{.name = "ExecutorCoLoopModule"};}// 初始化函数,框架会在模块加载时调用bool Initialize(aimrt::CoreRef aimrt_ptr) override;// 启动函数,框架会在模块 Start 时调用bool Start() override;// 关闭函数,框架会在模块卸载或退出时调用void Shutdown() override;private:// 获取模块的日志对象,用于打印日志auto GetLogger() { return core_.GetLogger(); }// 主逻辑循环,使用协程定义co::Task<void> MainLoop();private:// 框架核心对象引用,用于访问其他模块/调度器等aimrt::CoreRef core_;// 指向时间调度执行器的引用,用于控制调度策略aimrt::executor::ExecutorRef time_schedule_executor_;// 异步任务作用域,用于批量管理协程任务co::AsyncScope scope_;// 控制循环运行状态的原子布尔标志,线程安全std::atomic_bool run_flag_ = true;
};} // namespace aimrt::examples::cpp::executor::executor_co_loop_module
- 创建变量:核心句柄、执行器句柄、协程作用域管理器、循环控制标志
模块实现(executor_co_loop_module.cc
)
初始化阶段
bool ExecutorCoLoopModule::Initialize(aimrt::CoreRef core) {// 保存框架核心对象引用core_ = core;// 从框架中获取名为 "time_schedule_executor" 的调度器执行器time_schedule_executor_ =core_.GetExecutorManager().GetExecutor("time_schedule_executor");// 校验是否成功获取,并且该执行器支持定时调度功能AIMRT_CHECK_ERROR_THROW(time_schedule_executor_ && time_schedule_executor_.SupportTimerSchedule(),"无法获取支持定时调度的执行器:time_schedule_executor");// 打印初始化成功日志(中文)AIMRT_INFO("模块初始化成功。");return true;
}
运行阶段
bool ExecutorCoLoopModule::Start() {// 在内联调度器上启动主协程循环任务scope_.spawn(co::On(co::InlineScheduler(), MainLoop()));// 打印启动成功日志(中文)AIMRT_INFO("模块启动成功。");return true;
}// 模块的主协程逻辑
co::Task<void> ExecutorCoLoopModule::MainLoop() {// 打印循环开始日志(中文)AIMRT_INFO("协程主循环开始。");// 使用 time_schedule_executor 创建调度器对象auto scheduler = aimrt::co::AimRTScheduler(time_schedule_executor_);// 切换协程到定时调度线程池co_await co::Schedule(scheduler);uint32_t count = 0;// 开始循环,直到 run_flag_ 被设置为 falsewhile (run_flag_) {count++;// 打印当前循环次数(中文)AIMRT_INFO("协程主循环第 {} 次 -------------------------", count);// 延迟 1 秒后再次调度,实现定时循环逻辑co_await co::ScheduleAfter(scheduler, std::chrono::seconds(1));}// 循环结束后打印退出日志(中文)AIMRT_INFO("协程主循环已退出。");co_return;
}
停止阶段
void ExecutorCoLoopModule::Shutdown() {// 设置运行标志为 false,用于结束循环run_flag_ = false;// 阻塞等待所有协程任务完成co::SyncWait(scope_.complete());// 打印关闭成功日志(中文)AIMRT_INFO("模块已关闭。");
}
对应的启动配置文件
# Copyright (c) 2023, AgiBot Inc.
# All rights reserved.aimrt:log:core_lvl: INFO # Trace/Debug/Info/Warn/Error/Fatal/Offbackends:- type: consoleexecutor:executors:- name: time_schedule_executortype: asio_threadmodule:pkgs:- path: ./libexecutor_pkg.soenable_modules: [ExecutorCoLoopModule]modules:- name: ExecutorCoLoopModulelog_lvl: INFO
real_time_module
一个 executor 实时性相关功能的示例,演示内容包括:
- 如何通过配置文件设置执行器的线程调度策略和优先级、绑核策略等;
- 本示例仅在 linux 上有效;
- 本示例重点在于最后的配置文件
模块定义(real_time_module.h
)
#pragma once // 防止头文件被多次包含#include <atomic> // 提供原子类型 std::atomic,用于线程安全标志控制// 引入协程相关接口
#include "aimrt_module_cpp_interface/co/async_scope.h" // 异步作用域,管理协程生命周期
#include "aimrt_module_cpp_interface/co/task.h" // 协程任务 Task 定义
#include "aimrt_module_cpp_interface/module_base.h" // AIMRT 模块基类定义namespace Executor::real_time_module {// RealTimeModule 是一个继承自 AIMRT 框架的 ModuleBase 的模块实现
class RealTimeModule : public aimrt::ModuleBase {public:RealTimeModule() = default; // 默认构造函数~RealTimeModule() override = default; // 析构函数,确保虚析构行为// 实现模块信息接口,返回模块的名称ModuleInfo Info() const override {return ModuleInfo{.name = "RealTimeModule"};}// 模块初始化函数,在模块被加载时调用bool Initialize(aimrt::CoreRef aimrt_ptr) override;// 模块启动函数,在模块开始运行时调用bool Start() override;// 模块关闭函数,在模块卸载或退出时调用void Shutdown() override;private:// 获取模块的日志接口,用于打印日志信息auto GetLogger() { return core_.GetLogger(); }// 通过指定执行器名称启动一个协程工作循环void StartWorkLoopByExecutor(std::string_view executor_name);// 工作循环协程函数,接收一个执行器引用作为参数co::Task<void> WorkLoop(aimrt::executor::ExecutorRef executor_ptr);private:aimrt::CoreRef core_; // AIMRT 框架核心引用,用于访问系统资源co::AsyncScope scope_; // 管理模块中所有异步协程任务的作用域std::atomic_bool run_flag_ = true; // 控制定时循环协程是否继续运行的标志位
};} // namespace Executor::real_time_module
模块实现(real_time_module.cc
)
初始化阶段
// 初始化模块,保存 AIMRT 核心引用
bool RealTimeModule::Initialize(aimrt::CoreRef core) {// 保存 AIMRT 框架句柄core_ = core;// 日志:初始化成功AIMRT_INFO("初始化成功(Init succeeded).");return true;
}
运行阶段
// 启动模块,分别在三个不同类型的执行器上启动工作协程
bool RealTimeModule::Start() {StartWorkLoopByExecutor("sched_fifo_thread"); // 启动 FIFO 调度线程的协程StartWorkLoopByExecutor("sched_other_thread"); // 启动 OTHER 调度线程的协程StartWorkLoopByExecutor("sched_rr_thread"); // 启动 RR(轮询)调度线程的协程// 日志:启动成功AIMRT_INFO("启动成功(Start succeeded).");return true;
}// 启动指定执行器名的协程工作循环
void RealTimeModule::StartWorkLoopByExecutor(std::string_view executor_name) {auto executor = core_.GetExecutorManager().GetExecutor(executor_name);// 校验执行器存在并支持定时调度AIMRT_CHECK_ERROR_THROW(executor && executor.SupportTimerSchedule(),"获取执行器 '{}' 失败(Get executor '{}' failed).",executor_name);// 使用指定执行器调度 WorkLoop 协程,并托管给 scope_scope_.spawn(co::On(co::AimRTScheduler(executor), WorkLoop(executor)));
}// 工作协程逻辑:定时执行并打印线程和 CPU 信息
co::Task<void> RealTimeModule::WorkLoop(aimrt::executor::ExecutorRef executor) {try {// 日志:启动工作循环AIMRT_INFO("在执行器 {} 中启动工作循环(Start WorkLoop in {}).",executor.Name());#ifdef __linux__// 获取当前线程名称char thread_name[16];pthread_getname_np(pthread_self(), thread_name, sizeof(thread_name));// 获取线程调度策略和优先级int policy = 0;struct sched_param param;pthread_getschedparam(pthread_self(), &policy, ¶m);// 日志:打印线程信息AIMRT_INFO("执行器名: {}, 线程名: {}, 调度策略: {}, 优先级: {}(Executor name: ""{}, thread_name: {}, policy: {}, priority: {})",executor.Name(), thread_name, policy, param.sched_priority);
#endifuint32_t count = 0;while (run_flag_) {count++;// 记录当前时间戳(用于计算 sleep 时间)auto start_tp = std::chrono::steady_clock::now();// 协程挂起 1000 毫秒(使用指定调度器)co_await co::ScheduleAfter(co::AimRTScheduler(executor),std::chrono::milliseconds(1000));auto end_tp = std::chrono::steady_clock::now();#ifdef __linux__// 获取当前线程使用的 CPU 集合cpu_set_t cur_cpuset;CPU_ZERO(&cur_cpuset);auto pthread_getaffinity_np_ret = pthread_getaffinity_np(pthread_self(), sizeof(cpu_set_t), &cur_cpuset);AIMRT_CHECK_ERROR_THROW(pthread_getaffinity_np_ret == 0,"调用 pthread_getaffinity_np 失败,错误码:{}(Call ""'pthread_getaffinity_np' get error: {})",pthread_getaffinity_np_ret);// 构造当前线程使用的 CPU ID 字符串uint32_t cpu_size = std::thread::hardware_concurrency();std::string cur_cpuset_str;for (int ii = 0; ii < cpu_size; ii++) {if (CPU_ISSET(ii, &cur_cpuset)) {cur_cpuset_str += (std::to_string(ii) + ", ");}}cur_cpuset_str = cur_cpuset_str.substr(0, cur_cpuset_str.size() - 2);// 获取当前运行在哪个 CPU 上(逻辑核)unsigned int current_cpu = 0, current_node = 0;int getcpu_ret = getcpu(¤t_cpu, ¤t_node);AIMRT_CHECK_ERROR_THROW(getcpu_ret == 0,"调用 getcpu 失败,错误码:{}(Call 'getcpu' get error: {})",getcpu_ret);// 日志:打印循环次数、耗时、CPU 信息等AIMRT_INFO("循环次数: {}, 执行器名: {}, 睡眠时间: {}, 当前 CPU: {}, NUMA 节点: ""{}, 使用 CPU 集合: '{}'(""Loop count: {}, executor name: {}, sleep for {}, cpu: {}, node: {}, ""use cpu: '{}')",count, executor.Name(), end_tp - start_tp, current_cpu, current_node,cur_cpuset_str);
#else// 非 Linux 下仅打印次数与耗时AIMRT_INFO("循环次数: {}, 执行器名: {}, 睡眠时间: {}(Loop count: {}, executor ""name: {}, sleep for {})",count, executor.Name(), end_tp - start_tp);
#endif}// 日志:协程正常退出AIMRT_INFO("退出工作循环(Exit WorkLoop).");} catch (const std::exception& e) {// 日志:协程因异常退出AIMRT_ERROR("工作循环异常退出,原因:{}(Exit WorkLoop with exception, {})",e.what());}co_return;
}
停止阶段
// 关闭模块,停止协程并等待其完成
void RealTimeModule::Shutdown() {try {// 设置退出标志,等待所有协程完成run_flag_ = false;co::SyncWait(scope_.complete());} catch (const std::exception& e) {// 日志:关闭失败,打印异常信息AIMRT_ERROR("关闭失败(Shutdown failed),异常信息:{}", e.what());return;}// 日志:关闭成功AIMRT_INFO("关闭成功(Shutdown succeeded).");
}
对应的启动配置文件
官网文档:配置文件-执行器相关
# Copyright (c) 2023, AgiBot Inc.
# All rights reserved.aimrt:log:core_lvl: INFO # AimRT 框架核心日志级别,可选值:Trace / Debug / Info / Warn / Error / Fatal / Offbackends:- type: console # 日志输出后端,console 表示打印到终端控制台executor:executors:- name: sched_fifo_thread # 执行器名称(唯一标识)type: asio_thread # 执行器类型:基于 Boost.Asio 的线程池模型options:thread_num: 1 # 启动的线程数thread_sched_policy: SCHED_FIFO:80 # 调度策略为实时 FIFO,优先级 80(Linux 实时调度)thread_bind_cpu: [0, 1] # 将该线程绑定到第 0 和 1 号 CPU 核心上(用于控制 CPU 亲和性)timeout_alarm_threshold_us: 100 # 执行超时时间阈值(单位微秒),超过则记录报警日志- name: sched_other_thread # 普通时间片调度策略执行器type: asio_threadoptions:thread_num: 1thread_sched_policy: SCHED_OTHER # 普通调度策略(非实时)thread_bind_cpu: [2, 3]timeout_alarm_threshold_us: 100- name: sched_rr_thread # 轮转调度策略执行器type: asio_threadoptions:thread_num: 1thread_sched_policy: SCHED_RR:80 # 轮转(Round Robin)调度策略,优先级 80thread_bind_cpu: [4, 5, 6]timeout_alarm_threshold_us: 100module:pkgs:- path: ./libexecutor_pkg.so # 模块包路径(编译出的动态库)enable_modules: [RealTimeModule] # 从该库中启用哪些模块类(可包含多个)modules:- name: RealTimeModule # 启用的模块名称(对应继承自 ModuleBase 的类名)log_lvl: INFO # 该模块的日志级别(覆盖 core_lvl,仅对模块本身有效)