Elixir websocket客户端
最近用到了 websocket 客户端,来介绍几个 elixir 的 websocket 客户端。排名不分先后,上榜全看心情。
WebSocketx
文档:https://hexdocs.pm/websockex/readme.html
这是一个类似 GenServer 的 websocket 库,用法上也类似 GenServer。简单来说我们只需要定义一个模块实现 WebSocketx 行为,然后运行这个进程就可以了。
首先我们用 mix new client_demo --sup 创建一个 Mix 工程,然后在 mix.exs 中添加依赖。
def deps do[{:websockex, "~> 0.4.3"}]
end
最后使用 mix deps.get 下载依赖。
WebSocketx 提供了 start 和 start_link 函数来连接到服务器,它们的区别是后者会将客户端进程连接到当前进程,这个 GenServer 也是一样的。第一个参数是服务器地址,第二个参数是实现 WebSocketx 回调的模块,第三个参数是客户端状态。
defmodule ClientDemo douse WebSockex@url "http://localhost:8080"def start_link(_opts \\ []) doWebSockex.start_link(@url, __MODULE__, [], name: __MODULE__)end
end
然后我们将客户端添加到监督树中就可以运行了,这就是最简单的例子了。
children = [{ClientDemo, []}
]
使用 iex -S mix 运行工程,就可以连接到服务器了。
WebSocketx 提供的回调可以分为两类,一类和 WebSocket 有关的回调
handle_connect/2→ 连接建立之后调用。handle_disconnect/2→ 连接断开时调用,如服务器下线。handle_frame/2→ 处理 websocket 消息。handle_ping/2→ 收到 ping 消息时调用。handle_pong/2→ 收到 pong 消息时调用。
handle_connect/2 没啥好说的,正常返回 {:ok, new_state} 就好了,如果不需要在建立连接时特殊处理,也可以不实现它。
handle_disconnect/2 有三种返回值,对应了三种处理情况。
- 第一种是直接返回
{:ok, state}结束连接; - 第二种情况是返回
{:reconnect, state}尝试重连; - 第三种情况是返回
{:reconnect, conn, state}进行重连,与第二种的区别是它会用conn中的信息进行重连,比如换个IP或端口什么的,重试其他节点,避免在一棵树上吊死。
handle_frame/2 是接收并处理 websocket 消息的地方,它的第一个参数是数据帧 frame() 。
frame() :::ping| :pong| {:ping | :pong, nil | (message :: binary())}| {:text | :binary, message :: binary()}
常用的是 {:text, msg} 或 {:binary, msg},分别对应 websocket 中的两种消息类型:文本消息和二进制消息。
它的返回值对应着三种处理状态:
- 返回
{:ok, new_state}什么也不做; - 返回
{:reply, frame(), new_state}给服务端回复消息; - 返回
{:close, new_state}或{:close, close_frame(), new_state}关闭连接,close_frame()是 一个关闭码和消息的元组,比如{100, "param error"}。
第二类回调是 GenServer 的回调,前面我们说了它和 GenServer 是非常类似的。
handle_cast/2handle_info/2format_status/2terminate/2code_change/3
除了前两个,其他都和 GenServer 是差不多的。在 handle_cast/2 和 handle_info/2 回调中,除了返回 {:ok, state},还可以返回 {:reply, frame(), new_state} 给服务端发送消息,或者返回 {:close, new_state} 或 {:close, close_frame(), new_state} 关闭连接。
下面是一个较为完整的例子:
defmodule ClientDemo douse WebSockex@url "http://localhost:8080"def start_link(_opts \\ []) doWebSockex.start_link(@url, __MODULE__, [], name: __MODULE__)end# apidef send_text(text) doWebSockex.cast(__MODULE__, {:send, {:text, text}})enddef send_frame(text) doWebSockex.send_frame(__MODULE__, {:text, text})end# behaviour implementation@impl WebSockexdef handle_connect(conn, state) doIO.puts("connected to server: #{conn.host}"){:ok, state}end@impl WebSockexdef handle_disconnect(_connection_status_map, state) doIO.puts("**disconnected**"){:ok, state}end@impl WebSockexdef handle_frame({type, msg}, state) doIO.puts("Received Message - Type: #{inspect(type)} -- Message: #{inspect(msg)}"){:ok, state}end@impl WebSockexdef handle_cast({:send, {type, msg} = frame}, state) doIO.puts("Sending #{type} frame with payload: #{msg}"){:reply, frame, state}end
end
WebSockex 同样提供了 cast/2 函数来实现异步调用,最终会调用 handle_cast 回调。除了通过返回 {:reply, frame, state} 来向服务端发送消息,WebSockex 还提供了 send_frame/2 函数直接向服务端发送消息。
Fresh
文档:https://hexdocs.pm/fresh/readme.html
首先添加依赖:
defp deps do[{:fresh, "~> 0.4.4"}]
end
这是一个基于 Mint 生态的 websocket 库,Mint 稍后我们也会介绍。它的用法也和 GenServer 类似,依然是三板斧: use Fresh,实现回调,加入监督树。
Fresh 的启动方式有两种,一种是像前面的 WebSocketx 一样,在模块中自己写一个 start_link 函数,在函数中调用 Fresh.start_link/4 函数,最后将模块加入监督树。
defmodule WsClient.FreshClient douse Fresh@url "ws://localhost:8080/"def start_link(_opts \\ []) doFresh.start_link(@url, __MODULE__, [], name: {:local, __MODULE__})end
end
然后在监督树中加入这个模块。
children = [{WsClient.FreshClient, []}
]
但是由于 Fresh 的 __using__ 宏也提供了 start_link 函数。所以,其实我们也可以直接用它提供的 start_link 函数,而不用自己实现,只是注册时会稍微复杂一点,需要提供更多配置参数。
children = [{WsClient.FreshClient,uri: "ws://localhost:8080/", state: [], opts: [name: {:local, WsClient.FreshClient}]}
]
无论时使用哪种方式,都有两个需要注意的地方。一是连接地址的协议必须是 ws 或 wss,不能是 http,虽然在 WebSocketx 中是可以的。二是注册进程 pid 时,名称得是 {:local, __MODULE__},不能直接写 name: __MODULE__ 。
它提供的回调函数也可以分为两类,第一类是和 websocket 相关的回调。
handle_connect/3→ 连接成功后调用。handle_disconnect/3→ 连接断开时调用。handle_error/2→ 发生错误时调用。handle_in/2→ 处理 websocket 消息。handle_control/2→ 处理控制类消息,如 ping/pong。
handle_connect/3, handle_in/2 和 handle_control/2 返回的是 generic_handle_response() 类型:
@type generic_handle_response() ::{:ok, state()}| {:reply, Mint.WebSocket.frame() | [Mint.WebSocket.frame()], state()}| {:close, code :: non_neg_integer(), reason :: binary(), state()}
:ok 表示什么也不做, :reply 向服务端回复消息, :close 关闭连接。
handle_disconnect/3 和 handle_error/2 返回的是 connection_handle_response() 类型:
@type connection_handle_response() ::{:ignore, state()}| {:reconnect, initial :: state()}| {:close, reason :: term()}| :reconnect| :close
:ignore 表示什么也不做, :reconnect 表示重连, :close 表示关闭连接。
第二类是和进程本身相关的回调,只有两个。
handle_info/2handle_terminate/2→ 退出时调用。
handle_info/2 返回的也是 generic_handle_response() 类型,所以通过它也可以向服务端发送消息或者关闭连接。
除了 start/4 和 start_link/4 函数,Fresh 还提供了另外三个和 websocket 有关的函数:
send/2→ 直接向服务端发送 websocket 消息。close/3→ 关闭连接。open?/1→ 判断连接是否可用。
下面是一个比较完整的例子:
defmodule WsClient.FreshClient douse Fresh@url "ws://localhost:8080/"def start_link(_opts \\ []) doFresh.start_link(@url, __MODULE__, [], name: {:local, __MODULE__})end# apidef send_msg(text) doFresh.send(__MODULE__, {:text, text})end# behaviour implementationdef handle_connect(_status, _headers, _state) doIO.puts("Start counting from 0"){:reply, {:text, "1"}, 0}enddef handle_disconnect(_code, _reason, state) doIO.puts("**disconnected**"){:ignore, state}enddef handle_in({:text, number}, _state) donumber = String.to_integer(number)IO.puts("Number: #{number}"){:reply, {:text, "#{number + 1}"}, number}enddef handle_error(_error, state) doIO.puts("Oh, shit!"){:ignore, state}end
end
Mint.WebSocket
文档:https://hexdocs.pm/mint_web_socket/Mint.WebSocket.html
首先添加依赖:
defp deps do[{:mint_web_socket, "~> 1.0"}]
end
Mint 是一个函数式的 HTTP 客户端库,Mint.WebSocket 是 Mint 的 websocket 库,它也继承了 Mint 的函数式体质,因此需要我们自己来管理状态,用起来会稍显麻烦。
这里给一个例子稍微感受下:
def hello do# bootstrapIO.puts("> connect to server"){:ok, conn} = Mint.HTTP.connect(:http, "localhost", 8080)IO.puts("> upgrade connection"){:ok, conn, ref} = Mint.WebSocket.upgrade(:ws, conn, "/", [])http_get_message = receive(do: (message -> message)){:ok, conn, [{:status, ^ref, status}, {:headers, ^ref, resp_headers}, {:done, ^ref}]} =Mint.WebSocket.stream(conn, http_get_message){:ok, conn, websocket} = Mint.WebSocket.new(conn, ref, status, resp_headers)IO.puts("> send msg to server")# send the hello world frame{:ok, websocket, data} = Mint.WebSocket.encode(websocket, {:text, "hello world"}){:ok, conn} = Mint.WebSocket.stream_request_body(conn, ref, data)IO.puts("> receive msg from server")# receive the hello world reply framehello_world_echo_message = receive(do: (message -> message)){:ok, conn, [{:data, ^ref, data}]} = Mint.WebSocket.stream(conn, hello_world_echo_message){:ok, websocket, [{:text, "hello world"}]} = Mint.WebSocket.decode(websocket, data)
end
由于函数式的不变性,连接状态需要我们自己管理,然后在 API 之间传递,和前面的例子相比,除了处理业务逻辑,还需要管理连接状态。当然我们也可以将状态都封装到一个 GenServer 进程里,不过那样的话,使用 Fresh 库难道不会更香吗!
虽然看起来比较啰嗦,但大致逻辑并不复杂,我们不妨梳理下上面的例子:
-
使用
Mint.HTTP.connect连接到服务器; -
使用
Mint.WebSocket.upgrade将 http 连接升级为 websocket 连接; -
使用
receive接收服务端返回的消息(升级回复); -
使用
Mint.WebSocket.stream解析 http 响应,它只是Mint.HTTP.stream的封装,这里解析出来的响应是一个元组列表:[{:status, ^ref, status}, {:headers, ^ref, resp_headers}, {:done, ^ref} ]这里我们只是用模式匹配检查升级成功即可;
-
使用
Mint.WebSocket.new封装成 WebSocket 连接; -
使用
Mint.WebSocket.encode编码 websocket 消息; -
使用
Mint.WebSocket.stream_request_body发送消息给服务端; -
使用
receive接收服务端响应; -
使用
Mint.WebSocket.stream解析 http 响应; -
使用
Mint.WebSocket.decode从 http 响应中解码出 websocket 消息。
WebSocket 连接建立之后,后面的流程就是重复发送和接送消息的流程。
Gun
文档:https://ninenines.eu/docs/en/gun/2.2/guide/
添加依赖:
defp deps do[{:gun, "~> 2.2"}]
end
这其实是一个 erlang 的 http 客户端库,也支持 websocket 连接,他也比较底层,使用流程上和 Mint.WebSocket 类似,但是 API 要简洁一点。
还是上面的例子,我们换 gun 来写:
def world() do{:ok, connPid} = :gun.open(~c"localhost", 8080){:ok, protocol} = :gun.await_up(connPid)IO.inspect(protocol)streamRef = :gun.ws_upgrade(connPid, "/")# IO.inspect(streamRef){:gun_upgrade, ^connPid, ^streamRef, ["websocket"], _headers} =receive(do: (message -> message)):gun.ws_send(connPid, streamRef, {:text, "hello!!!"}){:gun_ws, ^connPid, ^streamRef, frame} = receive(do: (message -> message))IO.inspect(frame)
end
步骤还是一样:
- 使用
:gun.open连接服务器; - 使用
:gun.await_up等待升级; - 使用
:gun.ws_upgrade升级成 websocket 连接; - 使用
receive接收服务端返回的消息,判断升级是否成功; - 使用
:gun.ws_send发送消息给服务端; - 使用
receive接收服务的回复。
可以看到相比于 Mint.WebSocket 步骤确实少了一点,主要是发送和接收消息要简单一点。其实我们也可以用 GenServer 来封装 :gun,因为 :gun 比较简单,所以我们就用它来演示了,要封装 Min.WebSocket 也是一样的步骤,只是用到的 API 有所不同。
defmodule WsClient.GunClient douse GenServerdef start_link(_opts \\ []) doGenServer.start_link(__MODULE__, {~c"localhost", 8080}, name: __MODULE__)end# apidef send(:text, text) doGenServer.cast(__MODULE__, {:send, {:text, text}})end# behaviour implementationdef init({host, port}) do{:ok, pid} = :gun.open(~c"localhost", 8080){:ok, protocol} = :gun.await_up(pid)stream_ref = :gun.ws_upgrade(pid, "/"){:ok, %{pid: pid, protocol: protocol, ref: stream_ref, host: host, port: port}}end# send messagedef handle_cast({:send, {:text, text}}, %{pid: pid, ref: ref} = state) do:gun.ws_send(pid, ref, {:text, text}){:noreply, state}enddef handle_info({:gun_upgrade, _pid, _ref, ["websocket"], _headers}, state) doIO.puts("connet succed!"){:noreply, state}end# receive messagedef handle_info({:gun_ws, _, _, frame}, state) doIO.inspect(frame)# call :gun.send if need{:noreply, state}enddef handle_info({:gun_down, _pid, :ws, :closed, _ref}, state) doIO.puts("server closed."){:stop, "server closed", state}end
end
看,是不是也很简单!注意这里我们并没有做重试,如果服务端挂掉,客户端进程也就结束了。
以上就是 Elixir 中我觉得比较奈斯的 websocket 客户端库,一般情况下,选择 WebSocketx 或者 Fresh 就好了,使用起来比较简单,功能也很完善。当然,得益于 GenServer,自己动手封装也是可以的。
