查看源代码 使用代理进行简单的状态管理

在本指南中,我们将学习如何在多个实体之间保存和共享状态。如果你有之前的编程经验,你可能会想到全局共享变量,但我们将在此学习的模型与此完全不同。接下来的章节将概括这里介绍的概念。

如果你跳过了《入门》指南或很久以前读过,请务必重新阅读进程章节。我们将以此作为起点。

(可变)状态的麻烦

Elixir 是一种不可变语言,默认情况下没有任何东西是共享的。如果我们想共享信息,这些信息可以从多个地方读取和修改,我们在 Elixir 中有两个主要选择

我们在《入门》指南中介绍了进程。ETS(Erlang Term Storage)是一个新主题,我们将在后面的章节中探讨。不过,在涉及到进程时,我们很少手动编写自己的进程,而是使用 Elixir 和 OTP 中提供的抽象

  • Agent — 状态的简单包装器。
  • GenServer — “通用服务器”(进程),封装状态,提供同步和异步调用,支持代码重载等。
  • Task — 异步计算单元,允许生成进程,并在以后可能检索其结果。

我们将在本指南中探索这些抽象中的大多数。请记住,它们都是使用 VM 提供的基本功能(例如 send/2receive/1spawn/1Process.link/1)构建在进程之上的。

在这里,我们将使用代理,并创建一个名为 KV.Bucket 的模块,负责以允许其他进程读取和修改的方式存储我们的键值条目。

代理 101

Agent 是状态的简单包装器。如果你的进程只需要保存状态,代理就是一个很好的选择。让我们在项目中启动一个 iex 会话

$ iex -S mix

并玩一玩代理

iex> {:ok, agent} = Agent.start_link(fn -> [] end)
{:ok, #PID<0.57.0>}
iex> Agent.update(agent, fn list -> ["eggs" | list] end)
:ok
iex> Agent.get(agent, fn list -> list end)
["eggs"]
iex> Agent.stop(agent)
:ok

我们使用一个空列表的初始状态启动了一个代理。我们更新了代理的状态,将新项目添加到列表的开头。Agent.update/3 的第二个参数是一个函数,它接受代理的当前状态作为输入,并返回其所需的新的状态。最后,我们检索了整个列表。Agent.get/3 的第二个参数是一个函数,它接受状态作为输入,并返回 Agent.get/3 本身将返回的值。一旦我们完成了代理,我们可以调用 Agent.stop/3 来终止代理进程。

Agent.update/3 函数接受任何函数作为第二个参数,该函数接收一个参数并返回一个值

iex> {:ok, agent} = Agent.start_link(fn -> [] end)
{:ok, #PID<0.338.0>}
iex> Agent.update(agent, fn _list -> 123 end)
:ok
iex> Agent.update(agent, fn content -> %{a: content} end)
:ok
iex> Agent.update(agent, fn content -> [12 | [content]] end)
:ok
iex> Agent.update(agent, fn list -> [:nop | list] end)
:ok
iex> Agent.get(agent, fn content -> content end)
[:nop, 12, %{a: 123}]

如你所见,我们可以以任何我们想要的方式修改代理状态。因此,我们可能不希望在代码中的许多不同地方访问 Agent API。相反,我们希望将所有与 Agent 相关的功能封装在一个单独的模块中,我们将其称为 KV.Bucket。在我们实现它之前,让我们编写一些测试,这些测试将概述我们的模块公开的 API。

test/kv/bucket_test.exs(记住 .exs 扩展名)处创建一个文件,内容如下

defmodule KV.BucketTest do
  use ExUnit.Case, async: true

  test "stores values by key" do
    {:ok, bucket} = KV.Bucket.start_link([])
    assert KV.Bucket.get(bucket, "milk") == nil

    KV.Bucket.put(bucket, "milk", 3)
    assert KV.Bucket.get(bucket, "milk") == 3
  end
end

use ExUnit.Case 负责为测试设置我们的模块,并导入许多与测试相关的功能,例如 test/2 宏。

我们的第一个测试通过调用 start_link/1 并传递一个空的选项列表来启动一个新的 KV.Bucket。然后我们对它执行一些 get/2put/3 操作,断言结果。

另请注意传递给 ExUnit.Caseasync: true 选项。此选项通过使用我们机器中的多个内核,使测试用例与其他 :async 测试用例并行运行。这对于加快我们的测试套件非常有用。但是,:async 必须在测试用例不依赖于或不改变任何全局值的情况下设置。例如,如果测试需要写入文件系统或访问数据库,请将其保持同步(省略 :async 选项)以避免测试之间的竞争条件。

异步或同步,我们的新测试显然应该失败,因为正在测试的模块中没有实现任何功能

** (UndefinedFunctionError) function KV.Bucket.start_link/1 is undefined (module KV.Bucket is not available)

为了修复失败的测试,让我们在 lib/kv/bucket.ex 处创建一个文件,内容如下。请随时尝试使用代理自己实现 KV.Bucket 模块,然后再查看下面的实现。

defmodule KV.Bucket do
  use Agent

  @doc """
  Starts a new bucket.
  """
  def start_link(_opts) do
    Agent.start_link(fn -> %{} end)
  end

  @doc """
  Gets a value from the `bucket` by `key`.
  """
  def get(bucket, key) do
    Agent.get(bucket, &Map.get(&1, key))
  end

  @doc """
  Puts the `value` for the given `key` in the `bucket`.
  """
  def put(bucket, key, value) do
    Agent.update(bucket, &Map.put(&1, key, value))
  end
end

我们实现的第一步是调用 use Agent。我们将在本指南中学习的大多数功能,例如 GenServerSupervisor,都遵循此模式。对于所有这些功能,调用 use 将生成一个具有默认配置的 child_spec/1 函数,这将在我们第 4 章开始监督进程时非常有用。

然后我们定义一个 start_link/1 函数,它将有效地启动代理。通常定义一个始终接受选项列表的 start_link/1 函数。我们现在不打算使用任何选项,但我们以后可能会使用。然后我们继续调用 Agent.start_link/1,它接收一个匿名函数,该函数返回 Agent 的初始状态。

我们在代理中保存了一个映射来存储我们的键和值。使用 Agent API 和在 《入门》指南 中介绍的捕获运算符 &,在映射中获取和放置值。当 Agent.get/2Agent.update/2 被调用时,代理通过 &1 参数将其状态传递给匿名函数。

现在 KV.Bucket 模块已经定义,我们的测试应该通过!你可以尝试自己运行:mix test

使用 ExUnit 回调进行测试设置

在继续并向 KV.Bucket 添加更多功能之前,让我们谈谈 ExUnit 回调。正如你可能预期的那样,所有 KV.Bucket 测试都需要一个正在运行的桶代理。幸运的是,ExUnit 支持回调,使我们能够跳过这些重复的任务。

让我们重写测试用例以使用回调

defmodule KV.BucketTest do
  use ExUnit.Case, async: true

  setup do
    {:ok, bucket} = KV.Bucket.start_link([])
    %{bucket: bucket}
  end

  test "stores values by key", %{bucket: bucket} do
    assert KV.Bucket.get(bucket, "milk") == nil

    KV.Bucket.put(bucket, "milk", 3)
    assert KV.Bucket.get(bucket, "milk") == 3
  end
end

我们首先使用 setup/1 宏定义了一个设置回调。setup/1 宏定义了一个回调,该回调在每次测试之前运行,在与测试本身相同的进程中。

请注意,我们需要一种机制将 bucket PID 从回调传递给测试。我们通过使用测试上下文来做到这一点。当我们从回调中返回 %{bucket: bucket} 时,ExUnit 会将此映射合并到测试上下文中。由于测试上下文本身就是一个映射,我们可以从中模式匹配桶,在测试中提供对桶的访问权限

test "stores values by key", %{bucket: bucket} do
  # `bucket` is now the bucket from the setup block
end

你可以在 ExUnit.Case 模块文档 中阅读更多关于 ExUnit 用例的信息,并在 ExUnit.Callbacks 中阅读更多关于回调的信息。

其他代理操作

除了获取值和更新代理状态之外,代理还允许我们通过 Agent.get_and_update/2 在一个函数调用中获取值和更新代理状态。让我们实现一个 KV.Bucket.delete/2 函数,该函数从桶中删除一个键,并返回其当前值

@doc """
Deletes `key` from `bucket`.

Returns the current value of `key`, if `key` exists.
"""
def delete(bucket, key) do
  Agent.get_and_update(bucket, &Map.pop(&1, key))
end

现在轮到你为上述功能编写测试了!此外,请务必探索 关于 Agent 模块的文档,以了解有关它们的更多信息。

代理中的客户端/服务器

在我们进入下一章之前,让我们讨论代理中的客户端/服务器二分法。让我们扩展我们刚刚实现的 delete/2 函数

def delete(bucket, key) do
  Agent.get_and_update(bucket, fn dict ->
    Map.pop(dict, key)
  end)
end

传递给代理的函数内部的所有内容都在代理进程中发生。在这种情况下,由于代理进程是接收和响应我们消息的进程,因此我们说代理进程是服务器。函数外部的所有内容都在客户端发生。

这种区别很重要。如果有昂贵的操作要执行,你必须考虑在客户端还是服务器上执行这些操作更好。例如

def delete(bucket, key) do
  Process.sleep(1000) # puts client to sleep
  Agent.get_and_update(bucket, fn dict ->
    Process.sleep(1000) # puts server to sleep
    Map.pop(dict, key)
  end)
end

当在服务器上执行一个长时间操作时,所有对该特定服务器的其他请求都将等待该操作完成,这可能会导致某些客户端超时。

在下一章中,我们将探索 GenServer,在 GenServer 中,客户端和服务器之间的隔离变得更加明显。