当前位置: 首页 > news >正文

.Net Core基于EasyCore.EventBus实现事件总线

目录

1. CAP 理论(CAP Theorem)

1.1 一致性(Consistency):

1.2 可用性(Availability):

1.3 分区容忍性(Partition Tolerance):

1.4 CAP 定理的关键点:

1.5 CAP 理论的影响:

2. CAP的实际应用场景

2.1 CAP 模式选择:

3. AP 理论(Availability + Partition Tolerance)

4.EventBus与AP模型的实现

4.1 EventBus简介

4.2 EventBus与AP模型的关系

4.2.1 可用性(Availability)

4.2.2 分区容忍性(Partition Tolerance)

4.2.3 EventBus的容错性与一致性

4.3 EventBus实现AP模型的关键技术

4.3.1 使用分布式消息队列

4.3.2 事件存储与持久化

4.3.3 消息路由与负载均衡

5. 使用EasyCore.EventBus实现事件总线

5.1 简介

5.2 使用EasyCore.EventBus

5.2.1 Program

5.2.2 事件消息类

5.2.3 发布事件消息

5.2.4 事件消费实现

5.2.5 运行测试

6. 削峰填谷


1. CAP 理论(CAP Theorem)

CAP 理论,也叫做 布鲁尔定理(Brewer's Theorem),由计算机科学家 Eric Brewer 提出。CAP 理论指出,在分布式系统中,任何系统只能在以下三者中选取两个来优化,而不能同时保证三者的完美平衡:

  • C: 一致性(Consistency)

  • A: 可用性(Availability)

  • P: 分区容忍性(Partition Tolerance)

1.1 一致性(Consistency):

所有的节点在同一时刻看到的数据是相同的。

换句话说,一个系统保持一致性意味着,如果你从任何节点读取数据,你将看到最新写入的数据(或者任何其他写入操作的结果)。

1.2 可用性(Availability):

每个请求都会在合理的时间内得到响应,不管数据是否是最新的。

即使某些节点出现故障,系统也能保证其正常工作,并返回响应。系统会尽量保证每个请求都能得到响应,哪怕是一些不一致的结果。

1.3 分区容忍性(Partition Tolerance):

系统在发生网络分区的情况下依然能继续操作。

网络分区意味着某些节点无法与其他节点通信。在这种情况下,系统仍然应该保持操作的能力,不会完全崩溃。

1.4 CAP 定理的关键点:

  • CAP 定理认为,分布式系统不能同时做到一致性、可用性和分区容忍性三者兼得

  • 你只能选择两个属性:

    • CA(一致性 + 可用性): 适用于网络中没有分区的系统。

    • CP(一致性 + 分区容忍性): 即使有网络分区,系统依然保证一致性,但有可能会牺牲可用性。

    • AP(可用性 + 分区容忍性): 系统能保证在网络分区的情况下仍然保持可用性,但可能会牺牲一致性。

1.5 CAP 理论的影响

  • 选择一个系统的设计时,我们通常需要权衡这三个属性。例如:

    • 传统的关系型数据库(如 MySQL)通常关注一致性(Consistency)和可用性(Availability),并不特别关注分区容忍性(Partition Tolerance)。

    • 分布式系统CassandraCouchbase 可能会选择牺牲部分一致性以确保更高的可用性和分区容忍性。

2. CAP的实际应用场景

2.1 CAP 模式选择:

  • CA(Consistency + Availability)

    • 场景:适用于不涉及网络分区的分布式系统,或者在网络分区的情况下可以关闭部分服务(例如,一些小型数据库系统)。比如单一数据中心的传统关系型数据库可以选择一致性和可用性。

  • CP(Consistency + Partition tolerance)

    • 场景:适用于网络分区的情况下,一致性是首要考虑的场景。这类系统不会在网络分区的情况下牺牲一致性,通常会在分区时选择暂停服务。比如 ZookeeperHBase 是一致性和分区容忍的代表。

  • AP(Availability + Partition tolerance)

    • 场景:适用于需要高可用性和分区容忍性,同时可以容忍某些程度的不一致性。比如 CassandraMongoDBCouchbase 等系统,它们选择了可用性和分区容忍性,但会牺牲数据的一致性,特别是在高并发的情况下。

3. AP 理论(Availability + Partition Tolerance)

AP 理论是 CAP 理论的一部分,指的是 在分布式系统中,系统选择了可用性(Availability)和分区容忍性(Partition Tolerance),并牺牲了数据一致性(Consistency)

解释:

  • 可用性:系统保证每次请求都会返回一个响应。

  • 分区容忍性:即使系统出现网络分区,依然可以继续提供服务。

AP 模式下,系统会尽力保证可用性分区容忍性,而可能会牺牲一致性。这意味着,在某些情况下,系统中的节点可能不会看到相同的数据。

示例:

一个典型的 AP 系统就是 Cassandra。Cassandra 在设计上优先保证可用性和分区容忍性,特别是在分布式环境中,系统容忍某些节点暂时不可用而继续提供服务,但在这个过程中可能会有部分数据的不一致。

优点:

  • 高可用性:即使某些节点不可用,系统依然能保证服务不宕机,尽量返回响应。

  • 容忍网络分区:系统能容忍网络分区,继续提供服务。

缺点:

  • 可能的不一致性:由于不同节点的数据可能不同步,可能会读取到过期的数据,导致一致性问题。数据更新和同步可能会发生延迟。

4.EventBus与AP模型的实现

在分布式系统的设计中,EventBus(事件总线)是一个重要的组件,它允许不同的服务或模块之间通过事件进行异步通信。在微服务架构中,EventBus通常用于解耦和简化系统的模块化和扩展。然而,EventBus的设计和实现与分布式系统中的一致性、可用性和分区容忍性(即CAP理论)有着紧密的关系。在本文中,我们将深入探讨EventBus的概念、它是如何实现AP(Availability and Partition tolerance)模型的,及其在分布式系统中的重要性。

4.1 EventBus简介

EventBus是一个用于实现事件驱动架构(EDA)的机制,它允许不同组件之间基于事件进行通信。它通常遵循发布/订阅模式,其中:

  • 发布者(Publisher)发布事件。

  • 订阅者(Subscriber)接收并处理事件。

这种模式确保了松耦合的通信模型。发布者无需了解谁是事件的接收者,而订阅者也不需要知道事件的源头。事件总线充当了事件传播的中介角色,确保事件从发布者传递到所有相关订阅者。

EventBus在微服务架构中尤为重要,因为它允许跨服务的异步通信,避免了服务之间的紧密耦合。在分布式系统中,EventBus通常会与消息中间件(如Kafka、RabbitMQ、ActiveMQ等)结合使用,实现可靠的消息传递

4.2 EventBus与AP模型的关系

EventBus通常被设计为支持AP模型的系统。以下是EventBus如何实现AP模型的原因:

4.2.1 可用性(Availability)

EventBus通常是基于消息队列实现的,消息队列系统的一个重要特点就是高可用性。在AP模型下,EventBus保证每次事件的发布都会有响应,不会因为网络或其他故障导致系统停滞。以下是EventBus实现可用性的一些关键特性:

  1. 事件的异步处理

    • 事件的发布和处理通常是异步的,即使某个订阅者没有及时处理事件,其他订阅者仍然可以处理。即使部分订阅者因为故障无法处理事件,事件发布者依然可以继续发布事件,这保证了系统的可用性。

  2. 事件队列与消费者

    • 在使用消息队列的系统中,事件会被写入队列,消费者从队列中读取并处理事件。即使某些消费者失败或无法及时处理,消息队列仍然会保存事件,直到消费者能够恢复并消费事件。

  3. 事件的缓存和重试机制

    • 在高可用的EventBus中,消息可能会被缓存,直到它们被成功处理。若消息处理失败,EventBus可能会启用重试机制,确保事件最终被处理。

4.2.2 分区容忍性(Partition Tolerance)

分区容忍性是分布式系统中非常重要的一个特性。在分布式环境中,网络分区是不可避免的,EventBus的设计必须保证即使在分区的情况下,系统仍能继续运行。以下是EventBus如何实现分区容忍性的几个关键特性:

  1. 分布式架构

    • EventBus通常是基于分布式消息中间件实现的,消息可以在多个节点之间传递。在Kafka、RabbitMQ等消息队列系统中,事件被分发到多个节点,确保即使部分节点不可用,其他节点仍能继续工作。这保证了即使出现网络分区,EventBus也能继续提供服务。

  2. 消息持久化

    • 消息队列系统会将消息持久化到磁盘中,即使发生网络分区,消息依然能够被保存在磁盘中,待网络恢复后再进行处理。这样,EventBus能够容忍网络分区并保证事件不会丢失。

  3. 冗余和副本机制

    • 许多消息队列系统(例如Kafka)使用冗余和副本机制来确保事件数据的可靠性。当分区中的某个副本不可用时,其他副本会继续工作,保证了分区容忍性。

  4. 消息路由与负载均衡

    • EventBus能够动态调整消息的路由和负载均衡策略,确保即使某些节点因分区不可用,其他节点仍然能够处理消息。这是通过消息队列系统中的分区复制机制实现的。

4.2.3 EventBus的容错性与一致性

虽然EventBus通常设计为支持AP模型,但它并不意味着放弃一致性。在分布式环境中,EventBus系统可能会根据不同场景选择不同的一致性策略:

  1. 最终一致性

    • 在AP模型下,一致性通常是最终一致性,即系统中的数据可能在短时间内出现不一致,但最终会达到一致状态。这意味着,即使某个订阅者未能及时消费某个事件,其他订阅者仍然会处理事件,而事件最终会被所有订阅者处理。

  2. 事务性消息

    • 在一些高可靠性要求的场景中,EventBus可以结合事务机制来确保消息的一致性。例如,Kafka支持事务性消息,可以保证一组消息要么全部成功,要么全部失败,确保消息的原子性。

  3. 消息幂等性

    • 在EventBus系统中,订阅者通常需要处理幂等性的问题。即使同一个事件被多次消费,订阅者应该保证事件的处理结果一致。许多消息队列系统提供了幂等性机制,以确保事件的处理不会造成副作用。

4.3 EventBus实现AP模型的关键技术

4.3.1 使用分布式消息队列

分布式消息队列是实现EventBus的关键技术,它提供了可靠的消息传递机制,保证了事件的异步处理、重试机制和分区容忍性。常见的分布式消息队列有:

  • Kafka:Kafka采用了分区和副本机制来实现高可用和分区容忍,适合大规模的分布式事件传递。

  • RabbitMQ:RabbitMQ通过集群和镜像队列实现高可用,并且支持消息的持久化和重试。

4.3.2 事件存储与持久化

事件存储是EventBus系统中不可或缺的一部分。为了保证分区容忍性,事件需要被持久化,以防止消息丢失。消息队列系统通常提供持久化机制,将消息保存在磁盘或数据库中,直到被消费者处理。

4.3.3 消息路由与负载均衡

EventBus通过智能的消息路由和负载均衡策略,将事件分发到合适的消费者。通过这种方式,即使某些消费者因故障无法处理消息,其他消费者仍然能够继续处理。消息队列系统通常提供自动分区和分片机制,支持动态的负载均衡。

5. 使用EasyCore.EventBus实现事件总线

5.1 简介

EasyCore.EventBus提供了多个消息队列支持包

EasyCore.EventBus
EasyCore.EventBus.Kafka
EasyCore.EventBus.Pulsar
EasyCore.EventBus.RabbitMQ
EasyCore.EventBus.RedisStreams

可根据需要下载对应的支持包,支持的队列有Kafka、Pulsar、RabbitMQ、RedisStreams。支持本地EventBus和分布式EventBus,本地EventBus只需要引用EventBus包即可。分布式EventBus引用对应消息队列的支持包。

5.2 使用EasyCore.EventBus

5.2.1 Program

nuget引入EasyCore.EventBus支持包

Program中注册EasyCore.EventBus(本文使用的是基于RabbitMQ)

using EasyCore.EventBus;
using EasyCore.EventBus.RabbitMQ;namespace Web.RabbitMQ
{public class Program{public static void Main(string[] args){var builder = WebApplication.CreateBuilder(args);builder.Services.AddControllers();builder.Services.AddEndpointsApiExplorer();builder.Services.AddSwaggerGen();builder.Services.EasyCoreEventBus(options =>{options.RabbitMQ("localhost");});var app = builder.Build();if (app.Environment.IsDevelopment()){app.UseSwagger();app.UseSwaggerUI();}app.UseAuthorization();app.MapControllers();app.Run();}}
}

5.2.2 事件消息类

using EasyCore.EventBus.Event;namespace Web.RabbitMQ.Publish
{public class WebEventMessage : IEvent{public string? Message { get; set; }}
}

事件消息类需要继承IEvent接口。

5.2.3 发布事件消息

using EasyCore.EventBus.Distributed;
using Microsoft.AspNetCore.Mvc;namespace Web.RabbitMQ.Publish.Controllers
{[Route("api/[controller]")][ApiController]public class PublishController : ControllerBase{private readonly IDistributedEventBus _distributedEventBus;public PublishController(IDistributedEventBus distributedEventBus){_distributedEventBus = distributedEventBus;}[HttpPost]public async Task Publish(){var em = new WebEventMessage(){Message = "Hello, world!"};await _distributedEventBus.PublishAsync(em);}}
}

依赖注入传入IDistributedEventBus接口,IDistributedEventBus接口提供了两个消息发布方法。

using System.Threading.Tasks;
using EasyCore.EventBus.Event;namespace EasyCore.EventBus.Distributed;public interface IDistributedEventBus
{Task<bool> PublishAsync<TEvent>(TEvent eventMessage) where TEvent : IEvent;bool Publish<TEvent>(TEvent eventMessage) where TEvent : IEvent;
}

5.2.4 事件消费实现

using EasyCore.EventBus.Event;namespace Web.RabbitMQ.Publish
{public class MyEventMessage : IDistributedEventHandler<WebEventMessage>{private readonly ILogger<MyEventMessage> _logger;public MyEventMessage(ILogger<MyEventMessage> logger){_logger = logger;}public async Task HandleAsync(WebEventMessage eventMessage){_logger.LogInformation($"Received event message: {eventMessage.Message}--{Guid.NewGuid()}");await Task.CompletedTask;}}
}

事件消费类需要继承IDistributedEventHandler<T>接口,泛型T就是我们要消费的消息。并实现继承接口的方法。

5.2.5 运行测试

docker启动一个RabbitMQ

docker run -d  --name rabbitmq -e RABBITMQ_DEFAULT_USER=guest -e RABBITMQ_DEFAULT_PASS=guest -p 15672:15672 -p 5672:5672 rabbitmq:3-management

访问 http://localhost:15672 验证RabbitMQ是否成功启动,输入账号密码均是 guest 

启动运行项目

此时我们可以看到,消息已经发送成功,并且被成功消费。

6. 削峰填谷

EasyCore.EventBus其中的EasyCore.EventBus.RabbitMQ还可以当做削峰填谷来处理并发请求。

EasyCore.EventBus还支持多种队列,可根据自身需求进行选择下载。

http://www.dtcms.com/a/545440.html

相关文章:

  • 公司怎么做网站推广郑州包装设计公司
  • 阿里云服务器上构建基于PoS的以太坊2.0私有链
  • 如何把网站推广出编程代码怎么学
  • C++ 单调栈
  • 电商网站开发 上海wordpress 登陆 没反应
  • 服务器网站备案wordpress三道杠菜单
  • mysql upsert 用法(批量保存或更新)
  • 海康相机与机器人标定
  • 十年后,AI会赋予工业怎样的力量?
  • 西安市建设协会网站高级搜索入口
  • 东莞个人网站推广建设做中东市场哪个网站合适
  • Spring Boot 3 整合 LiteFlow:轻量级流程编排框架学习
  • Rust:WebSocket支持的实现
  • 代刷开通建设网站Wordpress怎么添加购买页面
  • 做网站几个步骤网址推荐你会感谢我的
  • 黑马商城day7-消息可靠性
  • wpsapi
  • Postman实现jwt发送请求
  • 网站正在备案什么是网络营销 职能是什么
  • 【AI】Prompt 提示词工程
  • R语言高效数据处理-3个自定义函数笔记
  • 石家庄做网站备案有哪些公司品牌广告公司网站建设
  • 纯静态网站怎么入侵报告王妃
  • 郑州微盟网站建设公司网站建设的目的和目标
  • 仓颉中的字符串常用方法:语义一致性与高性能的设计哲学
  • 新MCU开发板快速上手指南:从开箱到精通
  • NestJS 项目创建
  • Apache Spark算法开发指导-特征转换-StandardScaler
  • 两个2的n次幂相加
  • 实时Java规范(RTSJ):从理论到实践的实时系统编程范式