消息中心系统架构设计
消息中心系统架构设计
背景
随着业务复杂度提升,公司内部各系统间消息通信需求激增,原有简单邮件/短信发送系统面临:
- 渠道扩展困难,新增渠道需改动核心代码
- 缺乏统一监控,问题定位效率低
- 高峰期系统稳定性差,无熔断保护
- 业务耦合严重,发送逻辑侵入业务代码
架构目标
- 高可用:99.99%可用性,支持百万级消息/日
- 可扩展:支持快速接入新消息渠道
- 低延迟:平均处理时延<200ms
- 强一致:消息不丢失,至少投递一次
- 可视化:全链路追踪,实时监控看板
系统架构
整体架构图
+-------------------+ +-----------------+ +-------------------+
| 客户端/业务系统 | --> | API网关 | --> | 消息接收服务 |
+-------------------+ +-----------------+ +---------+---------+
|
v
+-------------------+ +-----------------+ +---------+---------+
| 监控告警平台 | <-- | 消息处理引擎 | <-- | 异步消息队列 |
+-------------------+ +----+-----+------+ +-------------------+
| |
v v
+----------------+ +----------------+ +-----------------+
| 渠道适配层 | | 熔断流控层 | | 存储服务 |
+--------+-------+ +--------+-------+ +--------+--------+
| | |
v v v
+----------------+ +----------------+ +-----------------+
| 第三方渠道 | | 熔断器集群 | | MySQL/MongoDB |
+----------------+ +----------------+ +-----------------+
分层架构设计说明
1. 接入层
// 消息接收接口示例(Spring Boot)
@RestController
public class MessageController {
@PostMapping("/message/send")
@SentinelResource(value = "messageAPI", blockHandler = "apiBlockHandler")
public ResponseDTO send(@Valid @RequestBody MessageRequest request) {
// 基础校验
if (!channelConfig.isEnabled(request.getChannelType())) {
throw new ChannelDisabledException();
}
// 投递到消息队列
messageQueueService.asyncSend(request);
return ResponseDTO.success("消息已接收");
}
// 接口级流控降级
public ResponseDTO apiBlockHandler(BlockException ex) {
return ResponseDTO.fail(429, "系统繁忙,请稍后重试");
}
}
2. 异步处理层
// RabbitMQ消息处理器
@Component
@RabbitListener(queues = "msg.process.queue")
public class MessageQueueConsumer {
@HystrixCommand(fallbackMethod = "processFallback")
@SentinelResource(value = "messageProcess", blockHandler = "processBlockHandler")
public void processMessage(MessageRequest request) {
// 构造消息上下文
MessageContext context = messageBuilder.buildContext(request);
// 启动责任链处理
SendResult result = routeChain.startHandle(context);
// 持久化处理结果
messageLogService.logResult(context, result);
}
// 处理降级策略
private void processFallback(MessageRequest request) {
messageRetryService.scheduleRetry(request);
}
}
3. 核心路由引擎
// 增强型责任链实现(支持动态配置)
public class DynamicRouteChain {
private List<RouteHandler> handlers;
private RouteStrategyConfig config;
public SendResult start(MessageContext ctx) {
// 根据配置动态排序处理器
List<RouteHandler> sortedHandlers = config.getStrategy().sort(handlers);
for (RouteHandler handler : sortedHandlers) {
if (handler.shouldHandle(ctx)) {
SendResult result = handler.handle(ctx);
if (result != null) return result;
}
}
throw new RouteException("无可用路由处理器");
}
}
// 路由策略配置示例(支持热更新)
public enum RouteStrategy {
PRIORITY_ORDER {
public List<RouteHandler> sort(List<RouteHandler> handlers) {
// 按配置优先级排序
return handlers.stream()
.sorted(Comparator.comparingInt(h -> config.getOrder(h.getClass())))
.collect(Collectors.toList());
}
},
LOAD_BALANCE {
public List<RouteHandler> sort(List<RouteHandler> handlers) {
// 根据渠道负载情况动态排序
return loadBalanceSorter.sort(handlers);
}
};
}
4. 渠道适配层
// 渠道发送模板抽象类
public abstract class ChannelTemplate {
// 发送方法(模板方法模式)
public final SendResult send(Message message) {
preCheck(message);
SignResult sign = generateSign(message);
SendResult result = doSend(message, sign);
postProcess(result);
return result;
}
protected abstract void preCheck(Message message);
protected abstract SignResult generateSign(Message message);
protected abstract SendResult doSend(Message message, SignResult sign);
protected abstract void postProcess(SendResult result);
}
// 短信渠道实现
@Component
public class SmsChannel extends ChannelTemplate {
@Override
protected SendResult doSend(Message message, SignResult sign) {
// 调用具体短信服务商API
return smsClient.send(message.getMobile(),
message.getContent(),
sign.getSignName());
}
@Override
protected void preCheck(Message message) {
if (!Validator.isMobile(message.getMobile())) {
throw new InvalidParamException("手机号格式错误");
}
}
}
5. 熔断监控层
// 熔断状态监控看板
public class CircuitBreakerDashboard {
private Map<String, HealthCounts> stats = new ConcurrentHashMap<>();
@Scheduled(fixedRate = 5000)
public void refreshStats() {
HystrixCommandMetrics.getInstances().forEach(metrics -> {
HealthCounts health = metrics.getHealthCounts();
stats.put(metrics.getCommandKey().name(), health);
});
}
public CircuitStatus getStatus(String commandKey) {
HealthCounts health = stats.get(commandKey);
if (health.getErrorPercentage() > 50) {
return CircuitStatus.OPEN;
}
return health.getTotalRequests() < 20 ?
CircuitStatus.CLOSED : CircuitStatus.HALF_OPEN;
}
}
核心数据模型设计
// 消息实体
public class Message {
private Long msgId;
private String templateId;
private Map<String, Object> params;
private MessageType type;
private Set<ChannelType> specifiedChannels;
private Integer retryCount = 0;
private MessagePriority priority;
}
// 渠道配置实体
public class ChannelConfig {
private ChannelType type;
private Boolean enabled;
private Integer maxRetries;
private Long rateLimit; // 渠道级QPS限制
private String apiKey;
private String secret;
private String endpoint;
}
系统关键流程
消息发送主流程
sequenceDiagram
participant Client
participant Gateway
participant Receiver
participant Queue
participant Processor
participant Router
participant Channel
Client->>Gateway: 发送消息请求
Gateway->>Receiver: 路由请求
Receiver->>Queue: 异步持久化
Queue->>Processor: 触发消费
Processor->>Router: 启动路由
Router->>Channel: 选择渠道
Channel-->>Processor: 返回结果
Processor->>Queue: 确认消费
Processor->>Monitor: 上报指标
三级路由决策流程
graph TD
A[开始] --> B{有指定渠道?}
B -->|是| C[指定渠道发送]
C --> D{成功?}
D -->|是| E[流程结束]
D -->|否| F[记录失败渠道]
B -->|否| G[全渠道广播]
G --> H{任一成功?}
H -->|是| E
H -->|否| I[分级降级发送]
I --> J[按优先级排序]
J --> K{遍历可用渠道}
K --> L[尝试发送]
L --> M{成功?}
M -->|是| E
M -->|否| N[记录失败]
N --> O{还有下一个?}
O -->|是| K
O -->|否| P[全部失败]
核心配置示例
# application.yml
message:
route:
strategies:
- type: SPECIFIC
order: 1
- type: BROADCAST
order: 2
- type: FALLBACK
order: 3
fallback-order: [SMS, EMAIL, APP_PUSH, WECHAT]
circuit-breaker:
sms:
request-volume-threshold: 20
error-threshold-percentage: 50
sleep-window: 5000
email:
request-volume-threshold: 15
error-threshold-percentage: 40
sentinel:
flow:
rules:
- resource: messageAPI
grade: 1
count: 1000
- resource: smsChannel
grade: 1
count: 200
系统扩展点设计
渠道热插拔
// 渠道注册中心
public class ChannelRegistry {
private Map<ChannelType, ChannelTemplate> channels = new ConcurrentHashMap<>();
public void registerChannel(ChannelType type, ChannelTemplate channel) {
channels.put(type, channel);
}
public void unregisterChannel(ChannelType type) {
channels.remove(type);
}
public ChannelTemplate getChannel(ChannelType type) {
return channels.get(type);
}
}
动态规则配置
// Nacos配置监听示例
@NacosConfigListener(dataId = "messageRouteRules", groupId = "DEFAULT_GROUP")
public void updateRouteRules(String newRules) {
RouteRuleParser parser = new RouteRuleParser();
this.routeRules = parser.parse(newRules);
log.info("路由规则已更新: {}", newRules);
}
多维度监控
// 监控指标采集器
public class MetricsCollector {
@SentinelMetric
public void collectRouteMetrics(MessageContext ctx, SendResult result) {
// 记录成功率
metricsCounter.incSuccess(ctx.getChannelType());
// 记录延迟
long latency = System.currentTimeMillis() - ctx.getCreateTime();
histogram.record(latency);
// 异常统计
if (!result.isSuccess()) {
errorLogger.logError(ctx, result);
}
}
}
容灾设计策略
多级降级机制
-
渠道级熔断:单个渠道连续失败触发自动熔断
-
服务级降级:核心服务不可用时切换备用集群
-
系统级限流:全局QPS超过阈值时启动拒绝策略
跨机房容灾
public class CrossIDCChannelSelector {
public Channel select(MessageContext ctx) {
// 优先同机房服务
Channel primary = localChannel.get(ctx.getType());
if (primary.isHealthy()) return primary;
// 切换异地备用
return remoteChannel.get(ctx.getType());
}
}
最终一致性保障
// 分布式事务补偿任务
@Scheduled(cron = "0 */5 * * * ?")
public void compensateFailedMessages() {
List<Message> failures = messageLogService.findFailedMessages();
failures.forEach(msg -> {
if (msg.getRetryCount() < maxRetries) {
messageQueueService.resend(msg);
} else {
alertService.notify(msg);
}
});
}
方案对比与未来展望
传统方案对比
对比维度 | 传统方案缺陷 | 本方案创新点 | 业务价值 |
---|---|---|---|
架构设计 | 单体架构,烟囱式开发 | 微服务+插件化架构 | 迭代速度提升3倍 |
消息路由 | 静态配置,人工维护路由表 | 动态策略引擎+机器学习推荐 | 送达率提升22% |
容错能力 | 失败后简单重试 | 智能降级+跨机房容灾 | 灾备切换时间<3秒 |
资源消耗 | 独立集群按峰值配置 | 弹性扩缩容+混部调度 | 硬件成本降低45% |
监控体系 | 基础指标监控 | 全链路追踪+根因分析 | MTTR从小时级降到分钟级 |
核心实施策略
-
渐进式演进:通过消息网关实现新旧系统平滑过渡
-
冗余设计:关键组件(如路由引擎)采用集群多活部署
-
自动化治理:熔断规则/流控策略动态配置生效
-
数据驱动:基于历史发送数据优化路由策略
未来优化方向
- 智能路由
-
实时计算渠道健康分(基于延迟/成功率/成本)
-
结合用户画像的个性化送达策略(如夜间不推送)
- 边缘计算
-
在区域数据中心部署消息处理节点
-
减少跨境/跨运营商传输延迟
- 消息语义化
-
自动识别消息紧急程度(如验证码优先路由)
-
敏感内容自动脱敏处理
- 生态扩展
-
对接ChatOps平台(如Slack/飞书机器人)
-
支持Serverless函数消息处理
- 绿色计算
-
闲时资源自动降频
-
基于碳效的渠道选择算法