Go语言事件总线EventBus本地事件总线系统的完整实现框架
在Go语言中,EventBus是一种非常有用的工具,它通过事件驱动的编程方式,帮助开发者实现组件之间的解耦,提高代码的可维护性和扩展性。
背景
-
软件架构的发展需求:随着软件系统的规模和复杂度不断增大,传统的紧耦合架构在开发、测试、部署和维护等方面都面临着诸多挑战。组件之间的高度依赖使得代码难以修改和扩展,一旦某个组件发生变化,可能需要修改多个相关联的组件。EventBus所代表的事件驱动架构应运而生,它允许组件之间通过事件进行松散耦合的通信,降低了组件之间的依赖关系,使得系统更加灵活和可维护。
-
Go语言的并发特性:Go语言以其强大的并发能力而闻名,goroutine的轻量级和高效的特性使得并发编程变得简单而高效。EventBus与Go语言的并发特性相结合,可以更好地实现异步事件处理,充分发挥Go语言在高并发场景下的优势,提高系统的响应性和吞吐量。
-
微服务架构的兴起:在微服务架构中,系统被拆分成多个小型的、独立的服务,这些服务之间需要进行高效的通信和协作。EventBus提供了一种轻量级的通信机制,使得微服务之间可以通过发布和订阅事件来实现解耦的交互,避免了服务之间的直接依赖,同时也支持异步的消息传递,提高了系统的可用性和可扩展性。
简介
-
基本概念:EventBus是一种设计模式,它充当一个中央集散地,负责在事件的发布者和订阅者之间进行消息的传递。在Go语言中,通过使用EventBus库,开发者可以轻松地实现事件的发布、订阅和处理。当某个事件发生时,发布者将事件发送到EventBus,EventBus根据事件的类型或主题,将事件通知给所有订阅了该事件的订阅者,订阅者接收到事件后执行相应的处理逻辑。
-
主要功能:提供了事件的发布与订阅功能,使得组件之间可以通过事件进行通信,而无需直接调用彼此的方法。支持异步事件处理,订阅者可以根据需要选择同步或异步的方式来处理事件,从而提高系统的响应速度和并发性能。具备事件的过滤和路由功能,可以根据事件的类型、主题或其他条件,将事件精准地分发给感兴趣的订阅者,提高事件处理的效率和准确性。
-
优势:解耦组件之间的依赖关系,使得各个组件可以独立开发、测试和部署,提高了代码的可维护性和可扩展性。简化了事件驱动编程的实现,通过简单的API调用,就可以实现事件的发布和订阅,降低了开发难度和工作量。支持异步消息处理,可以提高系统的响应性和吞吐量,适用于高并发场景。提供了灵活的事件处理机制,可以满足不同类型和复杂度的业务需求,如支持多种事件类型、通配符订阅等。
安装
确保您的计算机上安装了 Go。 在终端中键入以下命令:
go get github.com/asaskevich/EventBus
之后,就可以在使用EventBus的时候导入包了。
使用
在文件中添加以下行:*.go
import "github.com/asaskevich/EventBus"
如果你不喜欢使用 long ,你可以对其进行起别名来处理:
import (evbus "github.com/asaskevich/EventBus"
)
简单案例
package mainimport ("fmt""github.com/asaskevich/EventBus"
)func calculator(a int, b int) {fmt.Printf("%d\n", a+b)
}func main() {bus := EventBus.New()err := bus.Subscribe("main:calculator", calculator)if err != nil {fmt.Printf("订阅事件 %s 失败: %v\n", "main:calculator", err)}bus.Publish("main:calculator", 20, 40)err = bus.Unsubscribe("main:calculator", calculator)if err != nil {fmt.Printf("取消订阅事件 %s 失败: %v\n", "main:calculator", err)}
}
如果你想要面向对象编程进行学习的话,可以对其进行封装处理:
package mainimport ("fmt""github.com/asaskevich/EventBus"
)type Bus struct {EventBus EventBus.Bus
}func calculator(a int, b int) {fmt.Printf("a + b = "+"%d\n", a+b)
}// Subscribe 方法注册事件监听
func (bus *Bus) Subscribe() {err := bus.EventBus.Subscribe("main:calculator", calculator)if err != nil {fmt.Printf("订阅事件 %s 失败: %v\n", "main:calculator", err)}
}// Publish 方法触发事件
func (bus *Bus) Publish() {bus.EventBus.Publish("main:calculator", 33, 60)
}// UnSubscribe 取消订阅
func (bus *Bus) UnSubscribe() {err := bus.EventBus.Unsubscribe("main:calculator", calculator)if err != nil {fmt.Printf("取消订阅事件 %s 失败: %v\n", "main:calculator", err)}
}func main() {eventBus := EventBus.New()bus := &Bus{EventBus: eventBus}bus.Subscribe()bus.Publish()bus.UnSubscribe()
}
方法
- New()
- Subscribe()
- SubscribeOnce()
- Unsubscribe()
- HasCallback()
- Publish()
- SubscribeAsync()
- SubscribeOnceAsync()
- WaitAsync()
New()
函数签名
func New() EventBus
功能
创建一个新的事件总线实例,该实例用于管理事件的订阅、发布等操作。
应用场景
在需要使用事件总线机制的程序开始时调用,初始化事件总线。
示例代码
package mainimport ("github.com/asaskevich/EventBus"
)func main() {bus := EventBus.New() // 创建事件总线实例// 后续可使用 bus 进行事件订阅和发布操作
}
Subscribe()
函数签名
func (bus *EventBus) Subscribe(topic string, fn interface{}) error
功能
将一个回调函数订阅到指定的事件主题上。当该主题的事件被发布时,回调函数会被同步调用。
应用场景
适用于需要同步处理事件的场景,例如更新界面状态、记录日志等。
返回错误
如果第二个参数传的不是函数,则会返回错误。
示例代码
package mainimport ("fmt""github.com/asaskevich/EventBus"
)func calculator(a int, b int) {fmt.Printf("%d\n", a+b)
}func main() {bus := EventBus.New()err := bus.Subscribe("main:calculator", calculator)if err != nil {fmt.Printf("订阅事件 %s 失败: %v\n", "main:calculator", err)}bus.Publish("main:calculator", 20, 40)
}
SubscribeOnce()
函数签名
func (bus *EventBus) SubscribeOnce(topic string, fn interface{}) error
功能
将一个回调函数订阅到指定的事件主题上,该回调函数只会在事件第一次发布时被调用,之后自动取消订阅。
应用场景
适用于只需要处理一次事件的场景,例如初始化操作、一次性通知等。
返回错误
如果第二个参数传的不是函数,则会返回错误。
示例代码
package mainimport ("fmt""github.com/asaskevich/EventBus"
)func calculator(a int, b int) {fmt.Printf("%d\n", a+b)
}func main() {bus := EventBus.New()err := bus.SubscribeOnce("main:calculator", calculator)if err != nil {fmt.Printf("订阅事件 %s 失败: %v\n", "main:calculator", err)}bus.Publish("main:calculator", 20, 40)bus.Publish("main:calculator", 20, 40) // 第二次发布,calculator不会再执行
}
Unsubscribe()
函数签名
func (bus *EventBus) Unsubscribe(topic string, fn interface{}) error
功能
从指定的事件主题中取消订阅指定的回调函数。
应用场景
当不再需要处理某个事件时,调用该函数取消订阅,释放资源。
返回错误
- 事件名称不存在:当你尝试取消订阅一个从未被订阅过的事件名称时,Unsubscribe 会返回错误。
- 处理函数不匹配:若你尝试用一个和订阅时不同的处理函数来取消订阅,Unsubscribe 也会返回错误。
示例代码
package mainimport ("fmt""github.com/asaskevich/EventBus"
)func calculator(a int, b int) {fmt.Printf("%d\n", a+b)
}func main() {bus := EventBus.New()err := bus.Subscribe("main:calculator", calculator)if err != nil {fmt.Printf("订阅事件 %s 失败: %v\n", "main:calculator", err)}err = bus.Unsubscribe("main:calculator", calculator)if err != nil {fmt.Printf("取消订阅事件 %s 失败: %v\n", "main:calculator", err)}bus.Publish("main:calculator", 20, 40) // 发布事件,calculator 不会再执行
}
HasCallback()
函数签名
func (bus *EventBus) HasCallback(topic string) bool
功能
检查指定的事件主题是否存在已订阅的回调函数。
应用场景
在发布事件前检查是否有订阅者,避免不必要的发布操作;或者在取消订阅前确认是否有回调函数需要取消。
示例代码
package mainimport ("fmt""github.com/asaskevich/EventBus"
)func calculator(a int, b int) {fmt.Printf("%d\n", a+b)
}func main() {bus := EventBus.New()// 订阅事件err := bus.Subscribe("main:calculator", calculator)if err != nil {fmt.Printf("订阅事件 %s 失败: %v\n", "main:calculator", err)return}// 检查是否有订阅者if bus.HasCallback("main:calculator") {// 有订阅者,发布事件bus.Publish("main:calculator", 20, 40)} else {fmt.Println("没有订阅者,不发布事件")}// 取消订阅err = bus.Unsubscribe("main:calculator", calculator)if err != nil {fmt.Printf("取消订阅事件 %s 失败: %v\n", "main:calculator", err)}
}
Publish()
函数签名
func (bus *EventBus) Publish(topic string, args ...interface{})
功能
发布一个指定主题的事件,并将参数传递给所有订阅该主题的回调函数。
应用场景
在程序中某个特定事件发生时,调用该函数通知所有订阅者。例如,在一个聊天应用中,当服务器收到新消息时,发布消息事件通知所有客户端。
示例代码
package mainimport ("fmt""github.com/asaskevich/EventBus"
)func calculator(a int, b int) {fmt.Printf("%d\n", a+b)
}func main() {bus := EventBus.New()err := bus.Subscribe("main:calculator", calculator)if err != nil {fmt.Printf("订阅事件 %s 失败: %v\n", "main:calculator", err)}bus.Publish("main:calculator", 20, 40) // 发布事件,传递参数 20 和 40
}
SubscribeAsync()
函数签名
func (bus *EventBus) SubscribeAsync(topic string, fn interface{}, transactional bool) error
功能
以异步方式订阅某个事件,transactional=true
时按顺序执行,false
时并发执行。
应用场景
用于后台异步处理任务,如写日志、发送邮件等不会阻塞主流程的任务。
返回错误
如果第二个参数传的不是函数,则会返回错误。
示例代码
package mainimport ("fmt""github.com/asaskevich/EventBus""time"
)func calculator(a int, b int) {time.Sleep(1 * time.Second)fmt.Printf("%d\n", a+b)
}func main() {bus := EventBus.New()err := bus.SubscribeAsync("main:calculator", calculator,false)if err != nil {fmt.Printf("异步处理事件 %s 失败: %v\n", "main:calculator", err)}bus.Publish("main:calculator", 20, 40)fmt.Println("Main continues...")bus.WaitAsync() // 等待所有异步回调完成
}
SubscribeOnceAsync()
函数签名
func (bus *EventBus) SubscribeOnceAsync(topic string, fn interface{}, transactional bool) error
功能
以异步方式订阅事件,仅触发一次。
应用场景
用于一次性异步初始化、只执行一次的异步钩子或远程调用。
返回错误
如果第二个参数传的不是函数,则会返回错误。
示例代码
package mainimport ("fmt""github.com/asaskevich/EventBus""time"
)func calculator(a int, b int) {time.Sleep(1 * time.Second)fmt.Printf("%d\n", a+b)
}func main() {bus := EventBus.New()err := bus.SubscribeOnceAsync("main:calculator", calculator, false)if err != nil {fmt.Printf("一次性异步处理事件 %s 失败: %v\n", "main:calculator", err)}bus.Publish("main:calculator", 20, 40)bus.Publish("main:calculator", 20, 40) // 不会执行bus.WaitAsync()
}
WaitAsync()
函数签名
func (bus *EventBus) WaitAsync()
功能
等待所有异步事件处理完成。
应用场景
在程序退出前等待所有异步任务结束,确保不会中断执行中的任务。
示例代码
package mainimport ("fmt""github.com/asaskevich/EventBus""time"
)func calculator(a int, b int) {time.Sleep(1 * time.Second)fmt.Printf("%d\n", a+b)
}func main() {bus := EventBus.New()err := bus.SubscribeAsync("main:calculator", calculator,false)if err != nil {fmt.Printf("异步处理事件 %s 失败: %v\n", "main:calculator", err)}bus.Publish("main:calculator", 20, 40)fmt.Println("Main continues...")bus.WaitAsync()
}
完整示例
package mainimport ("fmt""time""github.com/asaskevich/EventBus"
)func main() {// New()bus := EventBus.New()// Subscribe()bus.Subscribe("math:add", func(a int, b int) {fmt.Printf("Add: %d + %d = %d\n", a, b, a+b)})// SubscribeOnce()bus.SubscribeOnce("notify:once", func() {fmt.Println("This message will be shown only once.")})// HasCallback()if bus.HasCallback("math:add") {fmt.Println("Callback for 'math:add' exists.")}// Publish()bus.Publish("math:add", 10, 20)bus.Publish("notify:once") // 第一次调用,有输出bus.Publish("notify:once") // 第二次调用,无输出// Unsubscribe()printHello := func() { fmt.Println("Hello!") }bus.Subscribe("say:hello", printHello)bus.Publish("say:hello")bus.Unsubscribe("say:hello", printHello)bus.Publish("say:hello") // 已取消订阅,无输出// SubscribeAsync()bus.SubscribeAsync("async:greet", func(name string) {time.Sleep(1 * time.Second)fmt.Printf("Hello, %s (from async)\n", name)}, false)// SubscribeOnceAsync()bus.SubscribeOnceAsync("init:once", func() {time.Sleep(1 * time.Second)fmt.Println("Async init done (only once).")}, false)// 异步事件发布bus.Publish("async:greet", "Alice")bus.Publish("init:once")bus.Publish("init:once") // 第二次不会触发// WaitAsync()fmt.Println("Waiting for async handlers to finish...")bus.WaitAsync()fmt.Println("All async tasks completed.")
}