查看源代码 Phoenix.Socket 行为 (Phoenix v1.7.14)
一个通过通道多路复用消息的套接字实现。
Phoenix.Socket
用作在客户端和服务器之间建立连接的模块。连接建立后,初始状态将存储在 Phoenix.Socket
结构体中。
同一个套接字可用于接收来自不同传输层的事件。Phoenix 支持 websocket
和 longpoll
选项,在您的端点中调用 Phoenix.Endpoint.socket/3
时。 websocket
默认设置,longpoll
也可以显式配置。
socket "/socket", MyAppWeb.Socket, websocket: true, longpoll: false
上面的命令意味着可以通过 WebSocket 连接进行传入的套接字连接。传入和传出的事件按主题路由到通道
channel "room:lobby", MyAppWeb.LobbyChannel
有关通道的更多信息,请参见 Phoenix.Channel
。
套接字行为
套接字处理程序在端点中安装,并且必须定义两个回调
connect/3
- 接收套接字参数、连接信息(如果有)并对连接进行身份验证。必须返回一个Phoenix.Socket
结构体,通常带有自定义分配id/1
- 接收connect/3
返回的套接字,并将此连接的 ID 返回为字符串。id
用于标识套接字连接,通常用于特定用户,允许我们强制断开连接。对于不需要身份验证的套接字,可以返回nil
示例
defmodule MyAppWeb.UserSocket do
use Phoenix.Socket
channel "room:*", MyAppWeb.RoomChannel
def connect(params, socket, _connect_info) do
{:ok, assign(socket, :user_id, params["user_id"])}
end
def id(socket), do: "users_socket:#{socket.assigns.user_id}"
end
# Disconnect all user's socket connections and their multiplexed channels
MyAppWeb.Endpoint.broadcast("users_socket:" <> user.id, "disconnect", %{})
套接字字段
:id
- 套接字的字符串 ID:assigns
- 套接字分配的映射,默认值:%{}
:channel
- 当前通道模块:channel_pid
- 通道进程 ID:endpoint
- 此套接字起源的端点模块,例如:MyAppWeb.Endpoint
:handler
- 此套接字起源的套接字模块,例如:MyAppWeb.UserSocket
:joined
- 如果套接字已有效加入通道:join_ref
- 客户端加入时发送的引用:ref
- 客户端发送的最新引用:pubsub_server
- 套接字的发布/订阅服务器的注册名称:topic
- 字符串主题,例如"room:123"
:transport
- 传输层的标识符,用于记录:transport_pid
- 套接字传输进程的进程 ID:serializer
- 套接字消息的序列化器
使用选项
在 use Phoenix.Socket
上,接受以下选项
:log
- 记录套接字操作的默认级别。默认值为:info
。可以设置为false
以禁用它:partitions
- 每个通道都在一个主管之下生成。此选项控制将生成多少个主管来处理通道。默认值为内核数量。
垃圾回收
在处理大型消息后,可以在传输进程中强制执行垃圾回收。例如,要从您的通道触发这种操作,请运行
send(socket.transport_pid, :garbage_collect)
或者,您可以配置您的端点套接字以通过设置 WebSockets 的 :fullsweep_after
选项来更频繁地触发更多完整清除垃圾回收。有关更多信息,请参见 Phoenix.Endpoint.socket/3
。
客户端-服务器通信
服务器数据的编码和客户端数据的解码是根据 Phoenix.Socket.Serializer
中定义的序列化器完成的。默认情况下,JSON 编码用于将消息传递到客户端和从客户端传递到客户端。
序列化器 decode!
函数必须返回一个 Phoenix.Socket.Message
,该消息将转发到通道,但以下情况除外
- "phoenix" 主题中的
"heartbeat"
事件 - 只应发出一个 OK 答复 - 任何主题上的
"phx_join"
- 应加入该主题 - 任何主题上的
"phx_leave"
- 应离开该主题
每条消息还具有一个 ref
字段,用于跟踪响应。
服务器可以发送消息或答复。对于消息,ref 唯一标识该消息。对于答复,ref 与原始消息匹配。这两种数据类型还包含一个 join_ref,它唯一标识当前加入的通道。
Phoenix.Socket
实现还可以发送特殊消息和答复
"phx_error"
- 如果发生错误,例如通道进程崩溃或尝试加入已加入的通道"phx_close"
- 通道已正常关闭
Phoenix 附带了 WebSocket 和长轮询的 JavaScript 实现,它们与 Phoenix.Socket 交互,可以用作对有兴趣实现自定义客户端的人员的参考。
自定义套接字和传输
有关编写不利用通道或编写与其他套接字交互的自定义传输的套接字的更多信息,请参见 Phoenix.Socket.Transport
文档。
自定义通道
您可以将任何模块列为通道,只要它实现一个 child_spec/1
函数即可。 child_spec/1
函数接收调用方作为参数,它必须返回一个初始化进程的子项规范。
进程初始化后,它将收到以下消息
{Phoenix.Channel, auth_payload, from, socket}
自定义通道实现必须在初始化期间使用自定义 reply_payload
调用 GenServer.reply(from, {:ok | :error, reply_payload})
,该自定义 reply_payload
将作为答复发送给客户端。如果未能执行此操作,套接字将永远被阻塞。
自定义通道接收 Phoenix.Socket.Message
结构体作为来自传输层的常规消息。可以通过构建适当的 Phoenix.Socket.Reply
和 Phoenix.Socket.Message
结构体,使用序列化器对其进行编码并将序列化后的结果分派到传输层,在任何时候将这些消息的答复和自定义消息发送到套接字。
例如,要处理 "phx_leave" 消息(建议由所有通道实现处理),可以执行以下操作
def handle_info(
%Message{topic: topic, event: "phx_leave"} = message,
%{topic: topic, serializer: serializer, transport_pid: transport_pid} = socket
) do
send transport_pid, serializer.encode!(build_leave_reply(message))
{:stop, {:shutdown, :left}, socket}
end
传递给所有通道的特殊消息是带有事件 "phx_drain" 的广播,在应用程序关闭期间清空套接字时发送。通常,它是通过向传输层发送排水消息来处理的,这会导致传输层关闭
def handle_info(
%Broadcast{event: "phx_drain"},
%{transport_pid: transport_pid} = socket
) do
send(transport_pid, :socket_drain)
{:stop, {:shutdown, :draining}, socket}
end
我们还建议所有通道在 init
上监视 transport_pid
,并在传输层退出时退出。我们还建议将 :normal
退出原因(通常是由于套接字关闭)重写为 {:shutdown, :closed}
,以保证通道退出时链接断开(因为 :normal
退出不会断开链接)
def handle_info({:DOWN, _, _, transport_pid, reason}, %{transport_pid: transport_pid} = socket) do
reason = if reason == :normal, do: {:shutdown, :closed}, else: reason
{:stop, reason, socket}
end
除非在关闭之前向套接字发送 {:socket_close, pid, reason}
消息,否则任何进程退出都被套接字层视为错误。
自定义通道实现无法使用 Phoenix.ChannelTest
进行测试。
摘要
类型
@type t() :: %Phoenix.Socket{ assigns: map(), channel: atom(), channel_pid: pid(), endpoint: atom(), handler: atom(), id: String.t() | nil, join_ref: term(), joined: boolean(), private: map(), pubsub_server: atom(), ref: term(), serializer: atom(), topic: String.t(), transport: atom(), transport_pid: pid() }
回调
connect/3
的快捷方式版本,不接收 connect_info
。
为了向后兼容提供。
@callback connect(params :: map(), t(), connect_info :: map()) :: {:ok, t()} | {:error, term()} | :error
接收套接字参数并对连接进行身份验证。
套接字参数和分配
套接字参数从客户端传递,可用于验证和身份验证用户。验证后,您可以将默认分配放入套接字中,这些分配将为所有通道设置,即
{:ok, assign(socket, :user_id, verified_user_id)}
要拒绝连接,请返回 :error
或 {:error, term}
。要控制在这种情况下客户端收到的响应,请 在 WebSocket 配置中定义错误处理程序。
有关在连接时执行令牌验证的示例,请参见 Phoenix.Token
文档。
标识套接字连接。
套接字 ID 是允许您识别给定用户的全部套接字的主题
def id(socket), do: "users_socket:#{socket.assigns.user_id}"
将允许您广播一个 "disconnect"
事件并终止给定用户的全部活动套接字和通道
MyAppWeb.Endpoint.broadcast("users_socket:" <> user.id, "disconnect", %{})
返回 nil
会使此套接字匿名。
函数
将键值对添加到套接字分配中。
可以传递单个键值对,也可以提供关键字列表或分配映射以合并到现有的套接字分配中。
示例
iex> assign(socket, :name, "Elixir")
iex> assign(socket, name: "Elixir", logo: "💧")
定义与给定主题和传输层匹配的通道。
topic_pattern
- 字符串模式,例如"room:*"
、"users:*"
或"system"
module
- 频道模块处理程序,例如MyAppWeb.RoomChannel
opts
- 可选的选项列表,见下文
选项
:assigns
- 在加入时合并到套接字中的套接字分配映射
示例
channel "topic1:*", MyChannel
主题模式
channel
宏接受两种类型的主题模式。作为最后一个字符提供的星号(*
字符)参数表示 "topic:subtopic"
匹配。如果提供了一个简单的字符串,则只有该主题将与频道处理程序匹配。大多数用例将使用 "topic:*"
模式来允许更灵活的主题范围。
有关更多信息,请参见 Phoenix.Channel