当前位置: 首页 > news >正文

Elixir websocket客户端

最近用到了 websocket 客户端,来介绍几个 elixir 的 websocket 客户端。排名不分先后,上榜全看心情。

WebSocketx

文档:https://hexdocs.pm/websockex/readme.html

这是一个类似 GenServer 的 websocket 库,用法上也类似 GenServer。简单来说我们只需要定义一个模块实现 WebSocketx 行为,然后运行这个进程就可以了。

首先我们用 mix new client_demo --sup 创建一个 Mix 工程,然后在 mix.exs 中添加依赖。

def deps do[{:websockex, "~> 0.4.3"}]
end

最后使用 mix deps.get 下载依赖。

WebSocketx 提供了 startstart_link 函数来连接到服务器,它们的区别是后者会将客户端进程连接到当前进程,这个 GenServer 也是一样的。第一个参数是服务器地址,第二个参数是实现 WebSocketx 回调的模块,第三个参数是客户端状态。

defmodule ClientDemo douse WebSockex@url "http://localhost:8080"def start_link(_opts \\ []) doWebSockex.start_link(@url, __MODULE__, [], name: __MODULE__)end
end

然后我们将客户端添加到监督树中就可以运行了,这就是最简单的例子了。

children = [{ClientDemo, []}
]

使用 iex -S mix 运行工程,就可以连接到服务器了。

WebSocketx 提供的回调可以分为两类,一类和 WebSocket 有关的回调

  • handle_connect/2 → 连接建立之后调用。
  • handle_disconnect/2 → 连接断开时调用,如服务器下线。
  • handle_frame/2 → 处理 websocket 消息。
  • handle_ping/2 → 收到 ping 消息时调用。
  • handle_pong/2 → 收到 pong 消息时调用。

handle_connect/2 没啥好说的,正常返回 {:ok, new_state} 就好了,如果不需要在建立连接时特殊处理,也可以不实现它。

handle_disconnect/2 有三种返回值,对应了三种处理情况。

  1. 第一种是直接返回 {:ok, state} 结束连接;
  2. 第二种情况是返回 {:reconnect, state} 尝试重连;
  3. 第三种情况是返回 {:reconnect, conn, state} 进行重连,与第二种的区别是它会用 conn 中的信息进行重连,比如换个IP或端口什么的,重试其他节点,避免在一棵树上吊死。

handle_frame/2 是接收并处理 websocket 消息的地方,它的第一个参数是数据帧 frame()

frame() :::ping| :pong| {:ping | :pong, nil | (message :: binary())}| {:text | :binary, message :: binary()}

常用的是 {:text, msg}{:binary, msg},分别对应 websocket 中的两种消息类型:文本消息和二进制消息。

它的返回值对应着三种处理状态:

  1. 返回 {:ok, new_state} 什么也不做;
  2. 返回 {:reply, frame(), new_state} 给服务端回复消息;
  3. 返回 {:close, new_state}{:close, close_frame(), new_state} 关闭连接, close_frame() 是 一个关闭码和消息的元组,比如 {100, "param error"}

第二类回调是 GenServer 的回调,前面我们说了它和 GenServer 是非常类似的。

  • handle_cast/2
  • handle_info/2
  • format_status/2
  • terminate/2
  • code_change/3

除了前两个,其他都和 GenServer 是差不多的。在 handle_cast/2handle_info/2 回调中,除了返回 {:ok, state},还可以返回 {:reply, frame(), new_state} 给服务端发送消息,或者返回 {:close, new_state}{:close, close_frame(), new_state} 关闭连接。

下面是一个较为完整的例子:

defmodule ClientDemo douse WebSockex@url "http://localhost:8080"def start_link(_opts \\ []) doWebSockex.start_link(@url, __MODULE__, [], name: __MODULE__)end# apidef send_text(text) doWebSockex.cast(__MODULE__, {:send, {:text, text}})enddef send_frame(text) doWebSockex.send_frame(__MODULE__, {:text, text})end# behaviour implementation@impl WebSockexdef handle_connect(conn, state) doIO.puts("connected to server: #{conn.host}"){:ok, state}end@impl WebSockexdef handle_disconnect(_connection_status_map, state) doIO.puts("**disconnected**"){:ok, state}end@impl WebSockexdef handle_frame({type, msg}, state) doIO.puts("Received Message - Type: #{inspect(type)} -- Message: #{inspect(msg)}"){:ok, state}end@impl WebSockexdef handle_cast({:send, {type, msg} = frame}, state) doIO.puts("Sending #{type} frame with payload: #{msg}"){:reply, frame, state}end
end

WebSockex 同样提供了 cast/2 函数来实现异步调用,最终会调用 handle_cast 回调。除了通过返回 {:reply, frame, state} 来向服务端发送消息,WebSockex 还提供了 send_frame/2 函数直接向服务端发送消息。

Fresh

文档:https://hexdocs.pm/fresh/readme.html

首先添加依赖:

defp deps do[{:fresh, "~> 0.4.4"}]
end

这是一个基于 Mint 生态的 websocket 库,Mint 稍后我们也会介绍。它的用法也和 GenServer 类似,依然是三板斧: use Fresh,实现回调,加入监督树。

Fresh 的启动方式有两种,一种是像前面的 WebSocketx 一样,在模块中自己写一个 start_link 函数,在函数中调用 Fresh.start_link/4 函数,最后将模块加入监督树。

defmodule WsClient.FreshClient douse Fresh@url "ws://localhost:8080/"def start_link(_opts \\ []) doFresh.start_link(@url, __MODULE__, [], name: {:local, __MODULE__})end
end

然后在监督树中加入这个模块。

children = [{WsClient.FreshClient, []}
]

但是由于 Fresh 的 __using__ 宏也提供了 start_link 函数。所以,其实我们也可以直接用它提供的 start_link 函数,而不用自己实现,只是注册时会稍微复杂一点,需要提供更多配置参数。

children = [{WsClient.FreshClient,uri: "ws://localhost:8080/", state: [], opts: [name: {:local, WsClient.FreshClient}]}
]

无论时使用哪种方式,都有两个需要注意的地方。一是连接地址的协议必须是 wswss,不能是 http,虽然在 WebSocketx 中是可以的。二是注册进程 pid 时,名称得是 {:local, __MODULE__},不能直接写 name: __MODULE__

它提供的回调函数也可以分为两类,第一类是和 websocket 相关的回调。

  • handle_connect/3 → 连接成功后调用。
  • handle_disconnect/3 → 连接断开时调用。
  • handle_error/2 → 发生错误时调用。
  • handle_in/2 → 处理 websocket 消息。
  • handle_control/2 → 处理控制类消息,如 ping/pong。

handle_connect/3handle_in/2handle_control/2 返回的是 generic_handle_response() 类型:

@type generic_handle_response() ::{:ok, state()}| {:reply, Mint.WebSocket.frame() | [Mint.WebSocket.frame()], state()}| {:close, code :: non_neg_integer(), reason :: binary(), state()}

:ok 表示什么也不做, :reply 向服务端回复消息, :close 关闭连接。

handle_disconnect/3handle_error/2 返回的是 connection_handle_response() 类型:

@type connection_handle_response() ::{:ignore, state()}| {:reconnect, initial :: state()}| {:close, reason :: term()}| :reconnect| :close

:ignore 表示什么也不做, :reconnect 表示重连, :close 表示关闭连接。

第二类是和进程本身相关的回调,只有两个。

  • handle_info/2
  • handle_terminate/2 → 退出时调用。

handle_info/2 返回的也是 generic_handle_response() 类型,所以通过它也可以向服务端发送消息或者关闭连接。

除了 start/4start_link/4 函数,Fresh 还提供了另外三个和 websocket 有关的函数:

  • send/2 → 直接向服务端发送 websocket 消息。
  • close/3 → 关闭连接。
  • open?/1 → 判断连接是否可用。

下面是一个比较完整的例子:

defmodule WsClient.FreshClient douse Fresh@url "ws://localhost:8080/"def start_link(_opts \\ []) doFresh.start_link(@url, __MODULE__, [], name: {:local, __MODULE__})end# apidef send_msg(text) doFresh.send(__MODULE__, {:text, text})end# behaviour implementationdef handle_connect(_status, _headers, _state) doIO.puts("Start counting from 0"){:reply, {:text, "1"}, 0}enddef handle_disconnect(_code, _reason, state) doIO.puts("**disconnected**"){:ignore, state}enddef handle_in({:text, number}, _state) donumber = String.to_integer(number)IO.puts("Number: #{number}"){:reply, {:text, "#{number + 1}"}, number}enddef handle_error(_error, state) doIO.puts("Oh, shit!"){:ignore, state}end
end

Mint.WebSocket

文档:https://hexdocs.pm/mint_web_socket/Mint.WebSocket.html

首先添加依赖:

defp deps do[{:mint_web_socket, "~> 1.0"}]
end

Mint 是一个函数式的 HTTP 客户端库,Mint.WebSocket 是 Mint 的 websocket 库,它也继承了 Mint 的函数式体质,因此需要我们自己来管理状态,用起来会稍显麻烦。

这里给一个例子稍微感受下:

def hello do# bootstrapIO.puts("> connect to server"){:ok, conn} = Mint.HTTP.connect(:http, "localhost", 8080)IO.puts("> upgrade connection"){:ok, conn, ref} = Mint.WebSocket.upgrade(:ws, conn, "/", [])http_get_message = receive(do: (message -> message)){:ok, conn, [{:status, ^ref, status}, {:headers, ^ref, resp_headers}, {:done, ^ref}]} =Mint.WebSocket.stream(conn, http_get_message){:ok, conn, websocket} = Mint.WebSocket.new(conn, ref, status, resp_headers)IO.puts("> send msg to server")# send the hello world frame{:ok, websocket, data} = Mint.WebSocket.encode(websocket, {:text, "hello world"}){:ok, conn} = Mint.WebSocket.stream_request_body(conn, ref, data)IO.puts("> receive msg from server")# receive the hello world reply framehello_world_echo_message = receive(do: (message -> message)){:ok, conn, [{:data, ^ref, data}]} = Mint.WebSocket.stream(conn, hello_world_echo_message){:ok, websocket, [{:text, "hello world"}]} = Mint.WebSocket.decode(websocket, data)
end

由于函数式的不变性,连接状态需要我们自己管理,然后在 API 之间传递,和前面的例子相比,除了处理业务逻辑,还需要管理连接状态。当然我们也可以将状态都封装到一个 GenServer 进程里,不过那样的话,使用 Fresh 库难道不会更香吗!

虽然看起来比较啰嗦,但大致逻辑并不复杂,我们不妨梳理下上面的例子:

  1. 使用 Mint.HTTP.connect 连接到服务器;

  2. 使用 Mint.WebSocket.upgrade 将 http 连接升级为 websocket 连接;

  3. 使用 receive 接收服务端返回的消息(升级回复);

  4. 使用 Mint.WebSocket.stream 解析 http 响应,它只是 Mint.HTTP.stream 的封装,这里解析出来的响应是一个元组列表:

    [{:status, ^ref, status}, {:headers, ^ref, resp_headers}, {:done, ^ref}
    ]
    

    这里我们只是用模式匹配检查升级成功即可;

  5. 使用 Mint.WebSocket.new 封装成 WebSocket 连接;

  6. 使用 Mint.WebSocket.encode 编码 websocket 消息;

  7. 使用 Mint.WebSocket.stream_request_body 发送消息给服务端;

  8. 使用 receive 接收服务端响应;

  9. 使用 Mint.WebSocket.stream 解析 http 响应;

  10. 使用 Mint.WebSocket.decode 从 http 响应中解码出 websocket 消息。

WebSocket 连接建立之后,后面的流程就是重复发送和接送消息的流程。

Gun

文档:https://ninenines.eu/docs/en/gun/2.2/guide/

添加依赖:

defp deps do[{:gun, "~> 2.2"}]
end

这其实是一个 erlang 的 http 客户端库,也支持 websocket 连接,他也比较底层,使用流程上和 Mint.WebSocket 类似,但是 API 要简洁一点。

还是上面的例子,我们换 gun 来写:

def world() do{:ok, connPid} = :gun.open(~c"localhost", 8080){:ok, protocol} = :gun.await_up(connPid)IO.inspect(protocol)streamRef = :gun.ws_upgrade(connPid, "/")# IO.inspect(streamRef){:gun_upgrade, ^connPid, ^streamRef, ["websocket"], _headers} =receive(do: (message -> message)):gun.ws_send(connPid, streamRef, {:text, "hello!!!"}){:gun_ws, ^connPid, ^streamRef, frame} = receive(do: (message -> message))IO.inspect(frame)
end

步骤还是一样:

  1. 使用 :gun.open 连接服务器;
  2. 使用 :gun.await_up 等待升级;
  3. 使用 :gun.ws_upgrade 升级成 websocket 连接;
  4. 使用 receive 接收服务端返回的消息,判断升级是否成功;
  5. 使用 :gun.ws_send 发送消息给服务端;
  6. 使用 receive 接收服务的回复。

可以看到相比于 Mint.WebSocket 步骤确实少了一点,主要是发送和接收消息要简单一点。其实我们也可以用 GenServer 来封装 :gun,因为 :gun 比较简单,所以我们就用它来演示了,要封装 Min.WebSocket 也是一样的步骤,只是用到的 API 有所不同。

defmodule WsClient.GunClient douse GenServerdef start_link(_opts \\ []) doGenServer.start_link(__MODULE__, {~c"localhost", 8080}, name: __MODULE__)end# apidef send(:text, text) doGenServer.cast(__MODULE__, {:send, {:text, text}})end# behaviour implementationdef init({host, port}) do{:ok, pid} = :gun.open(~c"localhost", 8080){:ok, protocol} = :gun.await_up(pid)stream_ref = :gun.ws_upgrade(pid, "/"){:ok, %{pid: pid, protocol: protocol, ref: stream_ref, host: host, port: port}}end# send messagedef handle_cast({:send, {:text, text}}, %{pid: pid, ref: ref} = state) do:gun.ws_send(pid, ref, {:text, text}){:noreply, state}enddef handle_info({:gun_upgrade, _pid, _ref, ["websocket"], _headers}, state) doIO.puts("connet succed!"){:noreply, state}end# receive messagedef handle_info({:gun_ws, _, _, frame}, state) doIO.inspect(frame)# call :gun.send if need{:noreply, state}enddef handle_info({:gun_down, _pid, :ws, :closed, _ref}, state) doIO.puts("server closed."){:stop, "server closed", state}end
end

看,是不是也很简单!注意这里我们并没有做重试,如果服务端挂掉,客户端进程也就结束了。


以上就是 Elixir 中我觉得比较奈斯的 websocket 客户端库,一般情况下,选择 WebSocketx 或者 Fresh 就好了,使用起来比较简单,功能也很完善。当然,得益于 GenServer,自己动手封装也是可以的。

http://www.dtcms.com/a/592608.html

相关文章:

  • uniapp+coze制作app智能体
  • linux gpio子系统学习
  • 前端基础——CSS练习项目:百度热榜实现
  • Java基础——集合进阶5
  • 没有网站怎么做cpa广告ps网站建设
  • 百度怎么注册自己的网站最有设计感的网站
  • 黑马程序员苍穹外卖(新手)Day1
  • 主从服务器的正反向声明
  • 一步一步学习使用FireMonkey动画() 用实例理解动画的运行状态
  • KUKA机械臂submit解释器将当前位置发送给C#上位机
  • 网站后台密码在哪个文件wordpress网页的源代码在哪里
  • 54_AI智能体运维部署之搭建Prometheus服务器:构建企业级监控基础设施
  • 【GitHub每日速递 】MCP 生态新工具!Registry 服务器注册服务预览版,AI 开发者部署认证全流程揭秘
  • 91、使用昇腾服务器构建FRP服务器,支持算能盒子访问
  • Vue 中实现 PDF 文件上传
  • 配置dns主从服务。要求从服务器能够定时从主服务器同步数据。
  • 中英文网站源码php网站开发8080无法访问此页面
  • 零基础如何在安服公司培训班成为网络安全工程师(黑客)
  • Oracle空间函数ST_AsText配置
  • 关系数据理论
  • 卫星姿态控制模式全解析:从基准到任务的体系化分类
  • 在百度seo快速收录要求是什么 有哪些
  • 一维前缀和与二维前缀和算法介绍及使用
  • Qwen多模态模型全解析
  • 做彩票网站要多少钱中山企业门户网站建设
  • 淘宝店铺全量商品接口实战:分类穿透采集与增量同步的技术方案
  • 【Linux】从基础到精通:内核调试与模块开发进阶之路
  • 高端品销售网站whois查询 站长工具
  • Diffusion Models与视频超分(3): 解读当前最快和最强的开源模型FlashVSR
  • 【Linux】进程间通信(二)命名管道(FIFO)实战指南:从指令操作到面向对象封装的进程间通信实现