当前位置: 首页 > news >正文

Go语言事件总线EventBus本地事件总线系统的完整实现框架

在Go语言中,EventBus是一种非常有用的工具,它通过事件驱动的编程方式,帮助开发者实现组件之间的解耦,提高代码的可维护性和扩展性。

背景

  • 软件架构的发展需求:随着软件系统的规模和复杂度不断增大,传统的紧耦合架构在开发、测试、部署和维护等方面都面临着诸多挑战。组件之间的高度依赖使得代码难以修改和扩展,一旦某个组件发生变化,可能需要修改多个相关联的组件。EventBus所代表的事件驱动架构应运而生,它允许组件之间通过事件进行松散耦合的通信,降低了组件之间的依赖关系,使得系统更加灵活和可维护。

  • Go语言的并发特性:Go语言以其强大的并发能力而闻名,goroutine的轻量级和高效的特性使得并发编程变得简单而高效。EventBus与Go语言的并发特性相结合,可以更好地实现异步事件处理,充分发挥Go语言在高并发场景下的优势,提高系统的响应性和吞吐量。

  • 微服务架构的兴起:在微服务架构中,系统被拆分成多个小型的、独立的服务,这些服务之间需要进行高效的通信和协作。EventBus提供了一种轻量级的通信机制,使得微服务之间可以通过发布和订阅事件来实现解耦的交互,避免了服务之间的直接依赖,同时也支持异步的消息传递,提高了系统的可用性和可扩展性。

简介

  • 基本概念:EventBus是一种设计模式,它充当一个中央集散地,负责在事件的发布者和订阅者之间进行消息的传递。在Go语言中,通过使用EventBus库,开发者可以轻松地实现事件的发布、订阅和处理。当某个事件发生时,发布者将事件发送到EventBus,EventBus根据事件的类型或主题,将事件通知给所有订阅了该事件的订阅者,订阅者接收到事件后执行相应的处理逻辑。

  • 主要功能:提供了事件的发布与订阅功能,使得组件之间可以通过事件进行通信,而无需直接调用彼此的方法。支持异步事件处理,订阅者可以根据需要选择同步或异步的方式来处理事件,从而提高系统的响应速度和并发性能。具备事件的过滤和路由功能,可以根据事件的类型、主题或其他条件,将事件精准地分发给感兴趣的订阅者,提高事件处理的效率和准确性。

  • 优势:解耦组件之间的依赖关系,使得各个组件可以独立开发、测试和部署,提高了代码的可维护性和可扩展性。简化了事件驱动编程的实现,通过简单的API调用,就可以实现事件的发布和订阅,降低了开发难度和工作量。支持异步消息处理,可以提高系统的响应性和吞吐量,适用于高并发场景。提供了灵活的事件处理机制,可以满足不同类型和复杂度的业务需求,如支持多种事件类型、通配符订阅等。

安装

确保您的计算机上安装了 Go。 在终端中键入以下命令:

go get github.com/asaskevich/EventBus

之后,就可以在使用EventBus的时候导入包了。

使用

在文件中添加以下行:*.go

import "github.com/asaskevich/EventBus"

如果你不喜欢使用 long ,你可以对其进行起别名来处理:

import (evbus "github.com/asaskevich/EventBus"
)

简单案例

package mainimport ("fmt""github.com/asaskevich/EventBus"
)func calculator(a int, b int) {fmt.Printf("%d\n", a+b)
}func main() {bus := EventBus.New()err := bus.Subscribe("main:calculator", calculator)if err != nil {fmt.Printf("订阅事件 %s 失败: %v\n", "main:calculator", err)}bus.Publish("main:calculator", 20, 40)err = bus.Unsubscribe("main:calculator", calculator)if err != nil {fmt.Printf("取消订阅事件 %s 失败: %v\n", "main:calculator", err)}
}

如果你想要面向对象编程进行学习的话,可以对其进行封装处理:

package mainimport ("fmt""github.com/asaskevich/EventBus"
)type Bus struct {EventBus EventBus.Bus
}func calculator(a int, b int) {fmt.Printf("a + b = "+"%d\n", a+b)
}// Subscribe 方法注册事件监听
func (bus *Bus) Subscribe() {err := bus.EventBus.Subscribe("main:calculator", calculator)if err != nil {fmt.Printf("订阅事件 %s 失败: %v\n", "main:calculator", err)}
}// Publish 方法触发事件
func (bus *Bus) Publish() {bus.EventBus.Publish("main:calculator", 33, 60)
}// UnSubscribe 取消订阅
func (bus *Bus) UnSubscribe() {err := bus.EventBus.Unsubscribe("main:calculator", calculator)if err != nil {fmt.Printf("取消订阅事件 %s 失败: %v\n", "main:calculator", err)}
}func main() {eventBus := EventBus.New()bus := &Bus{EventBus: eventBus}bus.Subscribe()bus.Publish()bus.UnSubscribe()
}

方法

  • New()
  • Subscribe()
  • SubscribeOnce()
  • Unsubscribe()
  • HasCallback()
  • Publish()
  • SubscribeAsync()
  • SubscribeOnceAsync()
  • WaitAsync()

New()

函数签名

func New() EventBus

功能

创建一个新的事件总线实例,该实例用于管理事件的订阅、发布等操作。

应用场景

在需要使用事件总线机制的程序开始时调用,初始化事件总线。

示例代码

package mainimport ("github.com/asaskevich/EventBus"
)func main() {bus := EventBus.New() // 创建事件总线实例// 后续可使用 bus 进行事件订阅和发布操作
}

 Subscribe()

函数签名

func (bus *EventBus) Subscribe(topic string, fn interface{}) error

功能

将一个回调函数订阅到指定的事件主题上。当该主题的事件被发布时,回调函数会被同步调用。

应用场景

适用于需要同步处理事件的场景,例如更新界面状态、记录日志等。

返回错误

如果第二个参数传的不是函数,则会返回错误。

示例代码

package mainimport ("fmt""github.com/asaskevich/EventBus"
)func calculator(a int, b int) {fmt.Printf("%d\n", a+b)
}func main() {bus := EventBus.New()err := bus.Subscribe("main:calculator", calculator)if err != nil {fmt.Printf("订阅事件 %s 失败: %v\n", "main:calculator", err)}bus.Publish("main:calculator", 20, 40)
}

 SubscribeOnce()

函数签名

func (bus *EventBus) SubscribeOnce(topic string, fn interface{}) error

功能

将一个回调函数订阅到指定的事件主题上,该回调函数只会在事件第一次发布时被调用,之后自动取消订阅。

应用场景

适用于只需要处理一次事件的场景,例如初始化操作、一次性通知等。

返回错误

如果第二个参数传的不是函数,则会返回错误。

示例代码

package mainimport ("fmt""github.com/asaskevich/EventBus"
)func calculator(a int, b int) {fmt.Printf("%d\n", a+b)
}func main() {bus := EventBus.New()err := bus.SubscribeOnce("main:calculator", calculator)if err != nil {fmt.Printf("订阅事件 %s 失败: %v\n", "main:calculator", err)}bus.Publish("main:calculator", 20, 40)bus.Publish("main:calculator", 20, 40)  // 第二次发布,calculator不会再执行
}

Unsubscribe()

函数签名

func (bus *EventBus) Unsubscribe(topic string, fn interface{}) error

功能

从指定的事件主题中取消订阅指定的回调函数。

应用场景

当不再需要处理某个事件时,调用该函数取消订阅,释放资源。

返回错误

  1. 事件名称不存在:当你尝试取消订阅一个从未被订阅过的事件名称时,Unsubscribe 会返回错误。
  2. 处理函数不匹配:若你尝试用一个和订阅时不同的处理函数来取消订阅,Unsubscribe 也会返回错误。

示例代码

package mainimport ("fmt""github.com/asaskevich/EventBus"
)func calculator(a int, b int) {fmt.Printf("%d\n", a+b)
}func main() {bus := EventBus.New()err := bus.Subscribe("main:calculator", calculator)if err != nil {fmt.Printf("订阅事件 %s 失败: %v\n", "main:calculator", err)}err = bus.Unsubscribe("main:calculator", calculator)if err != nil {fmt.Printf("取消订阅事件 %s 失败: %v\n", "main:calculator", err)}bus.Publish("main:calculator", 20, 40) // 发布事件,calculator 不会再执行
}

HasCallback()

函数签名

func (bus *EventBus) HasCallback(topic string) bool

功能

检查指定的事件主题是否存在已订阅的回调函数。

应用场景

在发布事件前检查是否有订阅者,避免不必要的发布操作;或者在取消订阅前确认是否有回调函数需要取消。

示例代码

package mainimport ("fmt""github.com/asaskevich/EventBus"
)func calculator(a int, b int) {fmt.Printf("%d\n", a+b)
}func main() {bus := EventBus.New()// 订阅事件err := bus.Subscribe("main:calculator", calculator)if err != nil {fmt.Printf("订阅事件 %s 失败: %v\n", "main:calculator", err)return}// 检查是否有订阅者if bus.HasCallback("main:calculator") {// 有订阅者,发布事件bus.Publish("main:calculator", 20, 40)} else {fmt.Println("没有订阅者,不发布事件")}// 取消订阅err = bus.Unsubscribe("main:calculator", calculator)if err != nil {fmt.Printf("取消订阅事件 %s 失败: %v\n", "main:calculator", err)}
}

Publish()

函数签名

func (bus *EventBus) Publish(topic string, args ...interface{})

功能

发布一个指定主题的事件,并将参数传递给所有订阅该主题的回调函数。

应用场景

在程序中某个特定事件发生时,调用该函数通知所有订阅者。例如,在一个聊天应用中,当服务器收到新消息时,发布消息事件通知所有客户端。

示例代码

package mainimport ("fmt""github.com/asaskevich/EventBus"
)func calculator(a int, b int) {fmt.Printf("%d\n", a+b)
}func main() {bus := EventBus.New()err := bus.Subscribe("main:calculator", calculator)if err != nil {fmt.Printf("订阅事件 %s 失败: %v\n", "main:calculator", err)}bus.Publish("main:calculator", 20, 40) // 发布事件,传递参数 20 和 40
}

SubscribeAsync()

函数签名

func (bus *EventBus) SubscribeAsync(topic string, fn interface{}, transactional bool) error

功能

以异步方式订阅某个事件,transactional=true 时按顺序执行,false 时并发执行。

应用场景

用于后台异步处理任务,如写日志、发送邮件等不会阻塞主流程的任务。

返回错误

如果第二个参数传的不是函数,则会返回错误。

示例代码

package mainimport ("fmt""github.com/asaskevich/EventBus""time"
)func calculator(a int, b int) {time.Sleep(1 * time.Second)fmt.Printf("%d\n", a+b)
}func main() {bus := EventBus.New()err := bus.SubscribeAsync("main:calculator", calculator,false)if err != nil {fmt.Printf("异步处理事件 %s 失败: %v\n", "main:calculator", err)}bus.Publish("main:calculator", 20, 40)fmt.Println("Main continues...")bus.WaitAsync() // 等待所有异步回调完成
}

 SubscribeOnceAsync()

函数签名

func (bus *EventBus) SubscribeOnceAsync(topic string, fn interface{}, transactional bool) error

功能

以异步方式订阅事件,仅触发一次。

应用场景

用于一次性异步初始化、只执行一次的异步钩子或远程调用。

返回错误

如果第二个参数传的不是函数,则会返回错误。

示例代码

package mainimport ("fmt""github.com/asaskevich/EventBus""time"
)func calculator(a int, b int) {time.Sleep(1 * time.Second)fmt.Printf("%d\n", a+b)
}func main() {bus := EventBus.New()err := bus.SubscribeOnceAsync("main:calculator", calculator, false)if err != nil {fmt.Printf("一次性异步处理事件 %s 失败: %v\n", "main:calculator", err)}bus.Publish("main:calculator", 20, 40)bus.Publish("main:calculator", 20, 40) // 不会执行bus.WaitAsync()
}

 WaitAsync()

函数签名

func (bus *EventBus) WaitAsync()

功能

等待所有异步事件处理完成。

应用场景

在程序退出前等待所有异步任务结束,确保不会中断执行中的任务。

示例代码

package mainimport ("fmt""github.com/asaskevich/EventBus""time"
)func calculator(a int, b int) {time.Sleep(1 * time.Second)fmt.Printf("%d\n", a+b)
}func main() {bus := EventBus.New()err := bus.SubscribeAsync("main:calculator", calculator,false)if err != nil {fmt.Printf("异步处理事件 %s 失败: %v\n", "main:calculator", err)}bus.Publish("main:calculator", 20, 40)fmt.Println("Main continues...")bus.WaitAsync()
}

完整示例

package mainimport ("fmt""time""github.com/asaskevich/EventBus"
)func main() {// New()bus := EventBus.New()// Subscribe()bus.Subscribe("math:add", func(a int, b int) {fmt.Printf("Add: %d + %d = %d\n", a, b, a+b)})// SubscribeOnce()bus.SubscribeOnce("notify:once", func() {fmt.Println("This message will be shown only once.")})// HasCallback()if bus.HasCallback("math:add") {fmt.Println("Callback for 'math:add' exists.")}// Publish()bus.Publish("math:add", 10, 20)bus.Publish("notify:once")  // 第一次调用,有输出bus.Publish("notify:once")  // 第二次调用,无输出// Unsubscribe()printHello := func() { fmt.Println("Hello!") }bus.Subscribe("say:hello", printHello)bus.Publish("say:hello")bus.Unsubscribe("say:hello", printHello)bus.Publish("say:hello") // 已取消订阅,无输出// SubscribeAsync()bus.SubscribeAsync("async:greet", func(name string) {time.Sleep(1 * time.Second)fmt.Printf("Hello, %s (from async)\n", name)}, false)// SubscribeOnceAsync()bus.SubscribeOnceAsync("init:once", func() {time.Sleep(1 * time.Second)fmt.Println("Async init done (only once).")}, false)// 异步事件发布bus.Publish("async:greet", "Alice")bus.Publish("init:once")bus.Publish("init:once") // 第二次不会触发// WaitAsync()fmt.Println("Waiting for async handlers to finish...")bus.WaitAsync()fmt.Println("All async tasks completed.")
}

相关文章:

  • Linux防止误关机
  • DO指数GPU版本
  • 一周学会Pandas2之Python数据处理与分析-数据重塑与透视-pivot() - 透视 (长 -> 宽,有限制)
  • cpp this指针
  • Python训练第四十天
  • 「Java教案」数据类型、变量与常量
  • Linux系统-基本指令(4)
  • Linux搭建DNS服务器
  • 基于FashionMnist数据集的自监督学习(生成式自监督学习AE算法)
  • C++基础算法————贪心
  • 那些常用的运维工具
  • b. 组合数
  • C++:参数传递方法(Parameter Passing Methods)
  • 用户认证的魔法配方:从模型设计到密码安全的奇幻之旅
  • HackMyVM-First
  • Linux【工具 04】Java等常用工具的多版本管理工具SDKMAN安装使用实例
  • SpringBoot整合MyBatis完整实践指南
  • Android任务栈管理策略总结
  • # CppCon 2014 学习: Quick game development with C++11/C++14
  • 构建多模型协同的Ollama智能对话系统
  • 亚马逊做网站发礼物换评价/西安市网站
  • 网站建设怎么申请域名/app营销策略有哪些
  • 游戏ui素材网站/2021最火营销方案
  • 长沙网站建设长沙/网络营销推广工具有哪些
  • 网站建设公司江西/2023年6月份疫情严重吗
  • 广州网站建设出名 乐云践新/网站推广计划