企业微信消息推送系统设计:基于ES(事件溯源思想)解耦规则与任务架构
1. 项目背景与需求分析
在企业日常运营中,及时准确的消息推送是保障信息传递效率的关键。基于企业微信的发消息功能,我们需要构建一个灵活可靠的消息推送系统,支持三种发送规则:
- 立即发送:创建规则后立即发送消息
- 定时发送:在指定时间点发送消息
- 周期发送:按照cron表达式定期发送消息
目标受众可以是企业微信中的部门、标签或指定用户。系统需要处理的核心挑战包括:
- 规则变更时的状态一致性
- 发场景下的任务执行
- 消息发送的可靠性和可追溯性
- 系统扩展性和维护性
2. 核心设计思想:事件溯源与任务解耦
2.1 什么是事件溯源?
事件溯源是一种架构模式,其核心思想是:不存储当前状态,而是存储导致状态变化的一系列事件。与传统CRUD系统中直接更新当前状态不同,ES通过持久化事件流来重建状态。
2.2 传统设计的挑战
在传统的消息推送系统中,通常会将规则与执行紧密耦合,这会导致:
- 状态管理复杂:规则变更时需要更新已生成的任务状态
- 并发冲突:需要复杂的分布式锁机制来保证数据一致性
- 审计困难:难以追溯完整的规则变更和执行历史
2.3 我们的解决方案
借鉴**事件溯源(Event Sourcing)**架构思想,通过引入任务层实现规则与执行的解耦:在我们的设计中,wecom_msg_send_task表实际上承担了"事件存储"的部分角色:
-- 每个任务代表一个"消息发送事件"
INSERT INTO wecom_msg_send_task (rule_id, type, planned_execute_time, status, created_at
) VALUES (?, ?, ?, 1, NOW()); -- 状态1:待执行,相当于事件已存储但未处理
规则创建/更新 → 生成任务 → 执行任务 → 记录日志
核心设计原则:
- 任务只增不改:避免并发写冲突。这一块的设计是重点,借鉴事件溯源思想中的只增不改的想法,规则的变更,通过新增任务实现,避免了复杂的更新逻辑
- 规则变更通过新任务体现:
- 乐观并发控制:降低分布式锁的复杂度
3. 系统架构设计
3.1 核心组件架构
┌─────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ 规则管理器 │ │ 规则调度器 │ │ 任务调度器 │
│ │ │ │ │ │
│ - 创建/更新规则 │───▶│ - 扫描周期规则 │───▶│ - 扫描待执行任务 │
│ - 生成立即任务 │ │ - 生成周期任务 │ │ - 并发执行任务 │
│ - 生成定时任务 │ │ - 排除时间检查 │ │ - 乐观锁控制 │
└─────────────────┘ └──────────────────┘ └──────────────────┘│ │ │▼ ▼ ▼
┌─────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ 规则表 │ │ 任务表 │ │ 发送日志表 │
└─────────────────┘ └──────────────────┘ └──────────────────┘
3.2 数据库表结构概览
-- 规则表:存储消息发送规则配置
CREATE TABLE "public"."wecom_msg_rule" ("id" bigserial PRIMARY KEY,"name" varchar(255) COLLATE "pg_catalog"."default" NOT NULL,"type" int2 NOT NULL DEFAULT nextval('wecom_msg_rule_id_seq'::regclass),"to_party" varchar(1000) COLLATE "pg_catalog"."default" NOT NULL,"to_tag" varchar(255) COLLATE "pg_catalog"."default" NOT NULL,"to_user" varchar(20000) COLLATE "pg_catalog"."default" NOT NULL,"content_id" int8 NOT NULL,"send_time" timestamp(6) NOT NULL,"cron_expression" json NOT NULL,"status" int2 NOT NULL,"operator_id" int8 NOT NULL,"created_at" timestamp(6) NOT NULL,"updated_at" timestamp(6) NOT NULL,"expected_receiver_count" int8 NOT NULL,"operator_name" varchar(255) COLLATE "pg_catalog"."default" NOT NULL
)
;
COMMENT ON COLUMN "public"."wecom_msg_rule"."id" IS '主键id';
COMMENT ON COLUMN "public"."wecom_msg_rule"."name" IS '规则名';
COMMENT ON COLUMN "public"."wecom_msg_rule"."type" IS '发送类型1:立即发送 2:定时发送 3:周期发送';
COMMENT ON COLUMN "public"."wecom_msg_rule"."to_party" IS '目标部门ID列表,使用”|“分隔';
COMMENT ON COLUMN "public"."wecom_msg_rule"."to_tag" IS '目标标签ID列表,使用”|“分隔';
COMMENT ON COLUMN "public"."wecom_msg_rule"."to_user" IS '目标用户ID列表,使用”|“分隔';
COMMENT ON COLUMN "public"."wecom_msg_rule"."content_id" IS '消息内容ID';
COMMENT ON COLUMN "public"."wecom_msg_rule"."send_time" IS '定时发送时间';
COMMENT ON COLUMN "public"."wecom_msg_rule"."cron_expression" IS '周期发送规则';
COMMENT ON COLUMN "public"."wecom_msg_rule"."status" IS '状态 1:启用 2:禁用 3:已推送';
COMMENT ON COLUMN "public"."wecom_msg_rule"."operator_id" IS '操作者id';
COMMENT ON COLUMN "public"."wecom_msg_rule"."created_at" IS '创建时间';
COMMENT ON COLUMN "public"."wecom_msg_rule"."updated_at" IS '更新时间';
COMMENT ON COLUMN "public"."wecom_msg_rule"."expected_receiver_count" IS '预计推送人数';
COMMENT ON COLUMN "public"."wecom_msg_rule"."operator_name" IS '操作者姓名';-- 任务表:规则与执行之间的缓冲层
CREATE TABLE "public"."wecom_msg_send_log" ("id" bigserial PRIMARY KEY,"rule_id" int8 NOT NULL,"content_id" int8 NOT NULL,"wecom_msg_id" varchar COLLATE 