当前位置: 首页 > news >正文

Go 服务如何“主动”通知用户?SSE广播与断线重连实战

各位 Gopher 们!你们是否曾遇到过这样的场景:

  • 你正在开发一个后台监控系统,想让 CPU 使用率、内存占用这些数据实时展现在前端,但只能让前端小哥每隔几秒就发一次请求,把服务器累得够呛?
  • 你想做一个类似微博、Twitter 的信息流,当有新消息时,能立刻“叮”一下推送到用户页面上,而不是等用户抓耳挠腮地手动刷新?
  • 或者,你只是想简单地通知用户:“您的外卖已由【帅哥张三】取走,正飞速奔向您!”,而不是让用户在订单页面望眼欲穿?

如果你对以上任何一个问题点了头,那么恭喜你,你可能一直在用“轮询”这个老办法。这就像是你派了个小弟,每五秒钟跑去厨房问一次:“饭好了没?”。不仅小弟跑断腿,厨师也烦得不行。

难道就没有更优雅的办法吗?当然有!今天的主角——Server-Sent Events (SSE),就是来拯救我们的!而我们要介绍的这个 Go SSE 库,更是能让你“一键”拥有这项超能力!

什么是 SSE?它和 WebSocket 有啥不一样?

在深入代码之前,我们先用大白话聊聊原理。

SSE (Server-Sent Events),顾名思义,就是“服务器发送的事件”。它建立在普通的 HTTP 连接上,但这个连接是“长连接”,而且是单向的。

把它想象成一个电台广播

  • 服务器 就是那个 24 小时不间断播报的电台。
  • 客户端(浏览器) 就是收音机。

一旦你把收音机调到正确的频道(建立连接),电台(服务器)就可以随时给你播送新闻、音乐(发送数据),而你不需要每分钟都打电话去问:“有新节目吗?”。

那它和 WebSocket 有啥区别呢?

  • SSE:是单行道。只能服务器往客户端推数据。简单、轻量,基于标准 HTTP,天生支持断线重连。非常适合那些只需要服务器向客户端推送信息的场景。
  • WebSocket:是双向高速公路。客户端和服务器可以随时互相“喊话”。功能更强大,但协议也更复杂。适合做在线聊天、协同编辑这种需要频繁双向沟通的场景。

总的来说,如果你的需求是“服务器 -> 客户端”的单向通知,那么 SSE 就是那个更简单、更对症的“轮子”。

这个 Go 库有哪些功能?

市面上实现 SSE 的库不少, 这个 sse 库真的太贴心了,就像一个全能管家:

  • 性能强劲:底层设计优秀,能轻松管理成千上万的客户端连接。
  • 断线自动重连:网络抖动?用户手滑关了页面又打开?别怕!这个库内置了自动重连和事件重发的机制,重要的消息一条都不会丢!(需要配合持久化存储)
  • 消息持久化:可以将历史事件存到 Redis、MySQL 或任何你喜欢的地方,妈妈再也不用担心服务器重启后消息丢失了。
  • 自带心跳包:自动检测“僵尸连接”,及时清理,保持连接池的健康。
  • 广播与单播:既可以给指定的一个或多个用户“说悄悄话”,也可以向所有在线用户“大声广播”。

听起来是不是很酷?别急,上代码的感觉更酷!

三分钟上手:搭建你的第一个 SSE 服务

让我们用一个简单的例子,看看用 sse 库快速搭建一个服务有多简单。假设我们要搭建一个每 5 秒钟向所有客户端广播一句“Hello World”的服务。

1. 服务端代码 (server.go)

你需要一个 Go 环境,并安装 Gin 框架(这个例子里用到了 Gin,当然你也可以用 Go 自带的 net/http)。

go get github.com/gin-gonic/gin
go get github.com/go-dev-frame/sponge/pkg/sse

然后,创建 main.go 文件:

package mainimport ("fmt""net/http""strconv""time""math/rand""github.com/gin-gonic/gin""github.com/go-dev-frame/sponge/pkg/sse"
)func main() {// 1. 初始化我们的 SSE "广播中心" (Hub)// 把它想象成那个电台的总控制室hub := sse.NewHub()defer hub.Close()// 2. 用 Gin 创建一个 Web 服务器r := gin.Default()// 3. 创建一个 "/events" 接口,让客户端来“收听广播”r.GET("/events", func(c *gin.Context) {fmt.Println("新听众加入!")// 这里为了演示,我们给每个连接的客户端随机分配一个IDuid := strconv.Itoa(rand.Intn(999) + 1000)hub.Serve(c, uid)})// 4. [可选] 创建一个接口,可以手动触发广播// 你可以用 curl 命令来测试:// curl -X POST -H "Content-Type: application/json" -d '{"events":[{"event":"message","data":"这是一条手动广播!"}]}' http://localhost:8080/pushr.POST("/push", hub.PushEventHandler())// 5. 启动一个不知疲倦的“播报员” (goroutine)go func() {i := 0for {// 每 5 秒钟准备一条新消息time.Sleep(time.Second * 5)i++msg := "大家好,我是第 " + strconv.Itoa(i) + " 条自动广播!"// 创建一个标准事件event := &sse.Event{Event: "message", // 事件类型,可以自定义Data:  msg,}// 调用 hub.Push 进行广播 (uid 列表传 nil 就是广播给所有人)fmt.Printf("正在广播: %s\n", msg)_ = hub.Push(nil, event) }}()fmt.Println("SSE 服务器已在 http://localhost:8080 启动")// 启动服务器if err := http.ListenAndServe(":8080", r); err != nil {panic(err)}
}

看,是不是超级清晰?初始化 Hub -> 创建连接点 -> 推送消息,搞定!

2. 客户端代码 (client.go)

现在,我们需要一个“收音机”来接收消息。这个库同样提供了客户端实现,非常方便。

package mainimport ("fmt""github.com/go-dev-frame/sponge/pkg/sse"
)func main() {url := "http://localhost:8080/events"// 1. 创建一个 SSE 客户端,指向我们的服务器地址client := sse.NewClient(url)// 2. 注册一个事件监听器// 告诉客户端:“一旦你收到了类型为 'message' 的事件,就执行下面的函数”client.OnEvent("message", func(event *sse.Event) {// event.Data 就是我们从服务器收到的消息内容fmt.Printf("收到了新广播!内容: 【%s】, ID: %s\n", event.Data, event.ID)})// 3. 开始连接!err := client.Connect()if err != nil {fmt.Printf("连接失败了,呜呜呜: %v\n", err)return}fmt.Println("收音机已打开,正在等待广播... (按 Ctrl+C 退出)")// 阻塞主程序,等待客户端退出<-client.Wait()
}

现在,先运行 go run server.go,然后打开另一个终端运行 go run client.go

你会看到,客户端每隔 5 秒就会打印出一条来自服务器的新消息,完全不需要客户端做任何多余的操作!这就是 SSE 的魅力!

当然也可以使用其他客户端来测试。

进阶玩法:让你的 SSE 服务更强大

SSE 库的强大之处远不止于此。

场景一:我不想丢失任何一条消息!

想象一下,你的服务正在推送重要的股票价格。如果客户端因为网络问题断开了 10 秒,他可能会错过一个亿!

这时,持久化存储事件重发 功能就派上用场了。

你只需要实现一个简单的 Store 接口,告诉 sse 库如何保存和读取事件(比如用 Redis)。

// 伪代码:实现一个你自己的 Store
type MyRedisStore struct{ /* ... redis client ... */ }func (s *MyRedisStore) Save(ctx context.Context, e *sse.Event) error {// 把 event 序列化成 JSON 存到 Redis 的 List 或 ZSet 里return nil 
}func (s *MyRedisStore) ListByLastID(ctx context.Context, eventType string, lastID string, pageSize int) ([]*sse.Event, string, error) {// 根据客户端上次收到的 lastID,从 Redis 里查询之后的新事件return events, nextLastID, nil
}// 初始化 Hub 时,带上你的存储和重发配置
hub := sse.NewHub(sse.WithStore(&MyRedisStore{}), // 使用你的 Redis 存储sse.WithEnableResendEvents(),  // 开启断线重发功能!
)

就这么简单!现在,当客户端断线重连时,它会自动带上它收到的最后一条消息的 ID。服务器看到后,就会从你的 Redis 里把所有错过的消息一次性补发给它。一个亿保住了!

场景二:我想知道消息有没有成功推送到

有时候,你想知道推送给某个特定用户的消息是否失败了(比如那个用户已经下线了)。你可以设置一个“失败回调函数”。

failedHandler := func(uid string, event *sse.Event) {// 这里的代码会在推送失败时执行log.Printf("哎呀,给用户 %s 推送消息 %s 失败了!可以记录下来,稍后重试。", uid, event.ID)
}hub := sse.NewHub(sse.WithPushFailedHandleFn(failedHandler))

这样,你就可以对推送失败的事件进行记录、告警或者其他的补偿操作了。

总结

Server-Sent Events (SSE) 是构建现代实时应用的利器,尤其是在处理服务器到客户端的单向数据流时,它比 WebSocket 更轻量、更简单。

而 sse 这个库,则像一个装备精良的瑞士军刀,不仅提供了 SSE 的核心功能,还贴心地为你准备了持久化、断线重连、失败处理、性能监控等一系列“豪华配置”。它让开发者可以从繁琐的连接管理和异常处理中解放出来,专注于业务逻辑的实现。

所以,下次当你的产品经理再提出“实时更新”的需求时,别再愁眉苦脸地去写轮询了。自信地拍拍胸脯,告诉他:“没问题,分分钟搞定!” 然后,优雅地 import "github.com/go-dev-frame/sponge/pkg/sse",开始你的表演吧!

GitHub地址: Sponge SSE


http://www.dtcms.com/a/263964.html

相关文章:

  • 从docker-compose快速入门Docker
  • VCenter SSL过期,登录提示HTTP 500错误解决办法
  • Linux驱动学习day13(同步与互斥)
  • 记录一次生产环境ActiveMQ无法启动的问题
  • 变幻莫测:CoreData 中 Transformable 类型面面俱到(八)
  • Raspberry Pi 4边缘智能PLC:OpenPLC赋能物联网
  • 25-7-1 论文学习(1)- Fractal Generative Models 何恺明大佬的论文
  • 半导体和PN结
  • 遥感影像岩性分类:基于CNN与CNN-EL集成学习的深度学习方法
  • 胖喵安初 (azi) Android 应用初始化库 (类似 Termux)
  • Adobe AI高效设计技巧与创新思维指南
  • day41简单CNN
  • 注意力得分矩阵求解例子
  • 网站崩溃的幕后黑手:GPTBot爬虫的流量冲击
  • 第七讲~~测试工具(禅道项目管理系统)
  • 【记录】Word|Word创建自动编号的多级列表标题样式
  • poi java 删除word的空白页
  • 【docker】docker save和docker load
  • 通达信【极弱强势指标与股道波段交易系统】幅图
  • Gin 中间件详解与实践
  • 发布/订阅模式:解耦系统的强大设计模式
  • Python Flask 容器化应用链路可观测
  • 基于SSM万华城市货运服务系统的设计与实现
  • 开源模型与商用模型协同开发机制设计
  • Vue基础(19)_Vue内置指令
  • Qt_Creator入门基础知识
  • 基于.Net的Web API 控制器及方法相关注解属性
  • Qt/C++运行报错:exited with code -1073741819
  • scp (Secure Copy Protocol)指令集
  • 向量数据库全面解析:原理、功能与主流产品对比