查看源代码 使用 GenServer 实现客户端-服务器通信

上一章中,我们使用代理来表示我们的桶。在Mix 简介中,我们指定希望为每个桶命名,以便我们能够执行以下操作

CREATE shopping
OK

PUT shopping milk 1
OK

GET shopping milk
1
OK

在上面的会话中,我们与“购物”桶进行了交互。

由于代理是进程,因此每个桶都有一个进程标识符 (PID),但桶没有名称。回到进程章节,我们了解到可以通过赋予原子名称的方式在 Elixir 中注册进程

iex> Agent.start_link(fn -> %{} end, name: :shopping)
{:ok, #PID<0.43.0>}
iex> KV.Bucket.put(:shopping, "milk", 1)
:ok
iex> KV.Bucket.get(:shopping, "milk")
1

但是,使用原子对动态进程进行命名是一个糟糕的想法!如果我们使用原子,则需要将桶名称(通常从外部客户端接收)转换为原子,并且**我们绝不应该将用户输入转换为原子**。这是因为原子不会被垃圾回收。创建原子后,它永远不会被回收。从用户输入生成原子意味着用户可以注入足够多的不同名称来耗尽我们的系统内存!

实际上,在内存不足之前,您更有可能达到 Erlang VM 的最大原子数量限制,这将导致您的系统无论如何都会崩溃。

与其滥用内置的命名机制,我们将创建自己的进程注册表,将桶名称与桶进程关联起来。

注册表需要保证其始终是最新的。例如,如果某个桶进程由于错误而崩溃,注册表必须注意到此更改并避免提供过时的条目。在 Elixir 中,我们说注册表需要监控每个桶。由于我们的注册表需要能够接收和处理来自系统的临时消息,因此Agent API 不足以满足需求。

我们将使用GenServer 来创建一个可以监控桶进程的注册表进程。GenServer 为在 Elixir 和 OTP 中构建服务器提供了工业级强大的功能。

如果您还没有,请阅读GenServer 模块文档以获得概述。完成之后,我们就可以继续了。

GenServer 回调

GenServer 是一个进程,它在特定条件下调用有限的函数集。当我们使用Agent 时,我们将客户端代码和服务器代码并排放置,如下所示

def put(bucket, key, value) do
  Agent.update(bucket, &Map.put(&1, key, value))
end

让我们稍微分解一下上面的代码

def put(bucket, key, value) do
  # Here is the client code
  Agent.update(bucket, fn state ->
    # Here is the server code
    Map.put(state, key, value)
  end)
  # Back to the client code
end

在上面的代码中,我们有一个进程,我们称之为“客户端”,它向代理(“服务器”)发送请求。该请求包含一个匿名函数,必须由服务器执行。

在 GenServer 中,上面的代码将是两个独立的函数,大致如下所示

def put(bucket, key, value) do
  # Send the server a :put "instruction"
  GenServer.call(bucket, {:put, key, value})
end

# Server callback

def handle_call({:put, key, value}, _from, state) do
  {:reply, :ok, Map.put(state, key, value)}
end

GenServer 代码中还有相当多的仪式,但正如我们将看到的,它也带来了一些好处。

现在,我们将仅编写用于桶注册逻辑的服务器回调,而不会提供适当的 API,这将在稍后进行。

lib/kv/registry.ex中创建一个新文件,内容如下

defmodule KV.Registry do
  use GenServer

  ## Missing Client API - will add this later

  ## Defining GenServer Callbacks

  @impl true
  def init(:ok) do
    {:ok, %{}}
  end

  @impl true
  def handle_call({:lookup, name}, _from, names) do
    {:reply, Map.fetch(names, name), names}
  end

  @impl true
  def handle_cast({:create, name}, names) do
    if Map.has_key?(names, name) do
      {:noreply, names}
    else
      {:ok, bucket} = KV.Bucket.start_link([])
      {:noreply, Map.put(names, name, bucket)}
    end
  end
end

您可以向 GenServer 发送两种类型的请求:调用和广播。调用是同步的,服务器**必须**将响应发送回此类请求。在服务器计算响应时,客户端处于**等待**状态。广播是异步的:服务器不会发送响应,因此客户端不会等待响应。这两种请求都是发送到服务器的消息,并将按顺序处理。在上面的实现中,我们对:create 消息进行模式匹配,以将其作为广播进行处理,并对:lookup 消息进行模式匹配,以将其作为调用进行处理。

为了调用上面的回调,我们需要通过相应的GenServer 函数。让我们启动一个注册表,创建一个名为桶,然后查找它

iex> {:ok, registry} = GenServer.start_link(KV.Registry, :ok)
{:ok, #PID<0.136.0>}
iex> GenServer.cast(registry, {:create, "shopping"})
:ok
iex> {:ok, bk} = GenServer.call(registry, {:lookup, "shopping"})
{:ok, #PID<0.174.0>}

我们的KV.Registry 进程接收了一个包含{:create, "shopping"} 的广播和一个包含{:lookup, "shopping"} 的调用,顺序为:GenServer.cast 将在消息发送到registry 后立即返回。另一方面,GenServer.call 是我们将在上面KV.Registry.handle_call 回调提供的答案处等待的地方。

您可能还注意到,我们在每个回调之前添加了@impl true@impl true 告知编译器,我们对后续函数定义的意图是定义回调。如果我们偶然地在函数名称或参数数量方面犯了错误,例如我们定义了handle_call/2,编译器会警告我们没有handle_call/2 要定义,并会给出GenServer 模块的所有已知回调的完整列表。

这一切都很好,但我们仍然希望为用户提供一个 API,使我们能够隐藏实现细节。

客户端 API

GenServer 在两部分中实现:客户端 API 和服务器回调。您可以将这两部分合并到单个模块中,也可以将它们分别放入客户端模块和服务器模块中。客户端是调用客户端函数的任何进程。服务器始终是我们将在客户端 API 中显式传递为参数的进程标识符或进程名称。这里,我们将使用单个模块来处理服务器回调和客户端 API。

编辑lib/kv/registry.ex 中的文件,填写客户端 API 的空白部分

  ## Client API

  @doc """
  Starts the registry.
  """
  def start_link(opts) do
    GenServer.start_link(__MODULE__, :ok, opts)
  end

  @doc """
  Looks up the bucket pid for `name` stored in `server`.

  Returns `{:ok, pid}` if the bucket exists, `:error` otherwise.
  """
  def lookup(server, name) do
    GenServer.call(server, {:lookup, name})
  end

  @doc """
  Ensures there is a bucket associated with the given `name` in `server`.
  """
  def create(server, name) do
    GenServer.cast(server, {:create, name})
  end

第一个函数是start_link/1,它启动一个新的 GenServer,并传递一个选项列表。start_link/1 调用GenServer.start_link/3,它接受三个参数

  1. 实现服务器回调的模块,在本例中为__MODULE__(表示当前模块)

  2. 初始化参数,在本例中为原子:ok

  3. 选项列表,可用于指定服务器名称等内容。现在,我们将我们在start_link/1 上接收的选项列表转发到GenServer.start_link/3

接下来的两个函数lookup/2create/2 负责将这些请求发送到服务器。在本例中,我们分别使用了{:lookup, name}{:create, name}。请求通常指定为元组,如本例所示,以便在第一个参数槽中提供多个“参数”。通常将请求的操作指定为元组的第一个元素,并将该操作的参数指定在剩余的元素中。请注意,请求必须与handle_call/3handle_cast/2 的第一个参数匹配。

这就是客户端 API 的全部内容。在服务器端,我们可以实现各种回调来保证服务器的初始化、终止和请求处理。这些回调是可选的,现在,我们只实现了我们关心的回调。让我们回顾一下。

第一个是init/1 回调,它接收传递给GenServer.start_link/3 的第二个参数,并返回{:ok, state},其中 state 是一个新的映射。我们已经可以注意到GenServer API 如何使客户端/服务器分离更加明显。start_link/3 发生在客户端,而init/1 是在服务器上运行的相应回调。

对于call/2 请求,我们实现了一个handle_call/3 回调,它接收request、我们接收请求的进程(_from)和当前服务器状态(names)。handle_call/3 回调以{:reply, reply, new_state} 格式返回一个元组。元组的第一个元素:reply 表示服务器应将响应发送回客户端。第二个元素reply 是将发送到客户端的内容,而第三个元素new_state 是新的服务器状态。

对于cast/2 请求,我们实现了一个handle_cast/2 回调,它接收request 和当前服务器状态(names)。handle_cast/2 回调以{:noreply, new_state} 格式返回一个元组。请注意,在实际应用程序中,我们可能已经使用同步调用而不是异步广播实现了:create 的回调。我们这样做是为了说明如何实现广播回调。

handle_call/3handle_cast/2 回调可以返回其他元组格式。还有其他回调,如terminate/2code_change/3,我们可以实现这些回调。欢迎您探索完整的GenServer 文档以详细了解这些回调。

现在,让我们编写一些测试来保证我们的 GenServer 按预期工作。

测试 GenServer

测试 GenServer 与测试代理没有什么不同。我们将在设置回调中生成服务器,并在整个测试过程中使用它。在test/kv/registry_test.exs 中创建一个文件,内容如下

defmodule KV.RegistryTest do
  use ExUnit.Case, async: true

  setup do
    registry = start_supervised!(KV.Registry)
    %{registry: registry}
  end

  test "spawns buckets", %{registry: registry} do
    assert KV.Registry.lookup(registry, "shopping") == :error

    KV.Registry.create(registry, "shopping")
    assert {:ok, bucket} = KV.Registry.lookup(registry, "shopping")

    KV.Bucket.put(bucket, "milk", 1)
    assert KV.Bucket.get(bucket, "milk") == 1
  end
end

我们的测试用例首先断言我们的注册表中没有桶,创建一个名为桶,查找它,并断言它像桶一样工作。

我们为KV.Registry 编写的setup 块与我们为KV.Bucket 编写的setup 块之间存在一个重要区别。我们没有通过调用KV.Registry.start_link/1 手动启动注册表,而是调用了ExUnit.Callbacks.start_supervised!/2 函数,并传递了KV.Registry 模块。

start_supervised! 函数是由use ExUnit.Case 插入到我们的测试模块中的。它通过调用KV.Registry 模块的start_link/1 函数来启动KV.Registry 进程。使用start_supervised! 的优点是,ExUnit 将保证注册表进程在下一个测试开始**之前**关闭。换句话说,它有助于保证一个测试的状态不会干扰下一个测试,以防它们依赖于共享资源。

在测试期间启动进程时,我们应该始终优先使用 start_supervised!。 我们建议您更改 bucket_test.exs 中的 setup 块以使用 start_supervised!

运行测试,它们应该全部通过!

监控的必要性

到目前为止,我们所做的一切都可以用 Agent 来实现。 在本节中,我们将看到使用 GenServer 可以实现的许多事情中的一件,而使用 Agent 无法实现。

让我们从一个测试开始,描述如果一个桶停止或崩溃,我们希望注册表如何表现。

test "removes buckets on exit", %{registry: registry} do
  KV.Registry.create(registry, "shopping")
  {:ok, bucket} = KV.Registry.lookup(registry, "shopping")
  Agent.stop(bucket)
  assert KV.Registry.lookup(registry, "shopping") == :error
end

上面的测试将在最后一个断言处失败,因为即使我们停止了桶进程,桶名称仍然保留在注册表中。

为了修复此错误,我们需要注册表监视它生成的每个桶。 一旦我们设置了监视器,注册表将在每次桶进程退出时收到通知,从而允许我们清理注册表。

让我们首先通过使用 iex -S mix 启动一个新的控制台来玩一下监视器。

iex> {:ok, pid} = KV.Bucket.start_link([])
{:ok, #PID<0.66.0>}
iex> Process.monitor(pid)
#Reference<0.0.0.551>
iex> Agent.stop(pid)
:ok
iex> flush()
{:DOWN, #Reference<0.0.0.551>, :process, #PID<0.66.0>, :normal}

注意 Process.monitor(pid) 返回一个唯一的引用,它允许我们将即将到来的消息与该监控引用匹配。 在我们停止代理之后,我们可以 flush/0 所有消息并注意到一条 :DOWN 消息到达,带有 monitor 返回的精确引用,通知桶进程以 :normal 原因退出。

让我们重新实现服务器回调以修复错误并使测试通过。 首先,我们将修改 GenServer 状态为两个字典:一个包含 name -> pid,另一个包含 ref -> name。 然后我们需要在 handle_cast/2 上监视桶,以及实现一个 handle_info/2 回调来处理监控消息。 完整的服务器回调实现如下所示

## Server callbacks

@impl true
def init(:ok) do
  names = %{}
  refs = %{}
  {:ok, {names, refs}}
end

@impl true
def handle_call({:lookup, name}, _from, state) do
  {names, _} = state
  {:reply, Map.fetch(names, name), state}
end

@impl true
def handle_cast({:create, name}, {names, refs}) do
  if Map.has_key?(names, name) do
    {:noreply, {names, refs}}
  else
    {:ok, bucket} = KV.Bucket.start_link([])
    ref = Process.monitor(bucket)
    refs = Map.put(refs, ref, name)
    names = Map.put(names, name, bucket)
    {:noreply, {names, refs}}
  end
end

@impl true
def handle_info({:DOWN, ref, :process, _pid, _reason}, {names, refs}) do
  {name, refs} = Map.pop(refs, ref)
  names = Map.delete(names, name)
  {:noreply, {names, refs}}
end

@impl true
def handle_info(msg, state) do
  require Logger
  Logger.debug("Unexpected message in KV.Registry: #{inspect(msg)}")
  {:noreply, state}
end

观察到,我们能够在不更改任何客户端 API 的情况下,大幅度地更改服务器实现。 这就是明确隔离服务器和客户端的好处之一。

最后,与其他回调不同,我们为 handle_info/2 定义了一个“全匹配”子句,该子句会丢弃并记录任何未知消息。 为了理解原因,让我们继续下一节。

callcastinfo

到目前为止,我们已经使用了三个回调:handle_call/3handle_cast/2handle_info/2。 以下是我们在决定何时使用每个回调时应该考虑的因素。

  1. handle_call/3 必须用于同步请求。 这应该是默认的选择,因为等待服务器回复是一种有用的反压机制。

  2. handle_cast/2 必须用于异步请求,当您不关心回复时。 投递不保证服务器已收到消息,因此应谨慎使用。 例如,我们在本章中定义的 create/2 函数应该使用 call/2。 我们出于教学目的使用了 cast/2

  3. handle_info/2 必须用于服务器可能接收到的所有其他消息,这些消息不是通过 GenServer.call/2GenServer.cast/2 发送的,包括使用 send/2 发送的常规消息。 监控 :DOWN 消息就是一个例子。

由于任何消息(包括通过 send/2 发送的消息)都将进入 handle_info/2,因此可能会出现意外消息到达服务器。 因此,如果我们没有定义全匹配子句,这些消息可能会导致我们的注册表崩溃,因为没有子句匹配。 但是,我们不必担心 handle_call/3handle_cast/2 的此类情况。 调用和投递只能通过 GenServer API 进行,因此未知消息很可能是一个开发人员错误。

为了帮助开发人员记住 call、cast 和 info 之间的区别、支持的返回值等等,我们有一个小型 GenServer 速查表

我们之前在 进程章节 中了解了链接。 现在,随着注册表完成,您可能想知道:我们何时应该使用监视器,何时应该使用链接?

链接是双向的。 如果您将两个进程链接起来,其中一个进程崩溃了,另一个进程也会崩溃(除非它正在捕获退出)。 监视器是单向的:只有监视进程才会收到有关被监视进程的通知。 换句话说:当您想要链接崩溃时使用链接,当您只想被告知崩溃、退出等等时使用监视器。

回到我们的 handle_cast/2 实现,您可以看到注册表正在同时链接和监视桶。

{:ok, bucket} = KV.Bucket.start_link([])
ref = Process.monitor(bucket)

这是一个不好的主意,因为我们不希望注册表在桶崩溃时崩溃。 正确的修复实际上是不将桶链接到注册表。 相反,我们将每个桶链接到一种特殊类型的进程,称为监管器,它们专门设计用于处理故障和崩溃。 我们将在下一章中了解有关它们的更多信息。