查看源代码 分布式任务和标签

在本章中,我们将回到 :kv 应用程序,并添加一个路由层,允许我们根据桶名称在节点之间分配请求。

路由层将接收以下格式的路由表

[
  {?a..?m, :"foo@computer-name"},
  {?n..?z, :"bar@computer-name"}
]

路由器将检查桶名称的第一个字节与表匹配,并根据该字节将请求调度到相应的节点。例如,以字母 "a" 开头的桶(?a 表示字母 "a" 的 Unicode 码点)将被调度到节点 foo@computer-name

如果匹配项指向正在评估请求的节点,那么我们就完成了路由,该节点将执行请求的操作。如果匹配项指向另一个节点,我们将把请求传递给该节点,该节点将查看它自己的路由表(它可能与第一个节点的路由表不同)并采取相应措施。如果没有任何匹配项,则会引发错误。

注意:我们将在本章中使用同一台机器上的两个节点。您可以在同一网络上的两台(或更多)不同的机器上使用,但您需要做一些准备工作。首先,您需要确保所有机器都具有一个 ~/.erlang.cookie 文件,其值完全相同。然后,您需要确保 epmd 正在一个未被阻止的端口上运行(您可以运行 epmd -d 以获取调试信息)。

我们的第一个分布式代码

Elixir 带有用于连接节点并在它们之间交换信息的工具。实际上,我们在分布式环境中工作时使用相同的进程、消息传递和接收消息的概念,因为 Elixir 进程是位置透明的。这意味着,在发送消息时,接收者进程位于同一节点还是另一个节点并不重要,VM 都能够在两种情况下传递消息。

为了运行分布式代码,我们需要使用一个名称启动 VM。该名称可以是短名称(在同一网络中)或长名称(需要完整计算机地址)。让我们启动一个新的 IEx 会话

$ iex --sname foo

现在您可以看到提示略有不同,显示了节点名称,后跟计算机名称

Interactive Elixir - press Ctrl+C to exit (type h() ENTER for help)
iex(foo@jv)1>

我的计算机名为 jv,因此我在上面的示例中看到的是 foo@jv,但您将得到不同的结果。在以下示例中,我们将使用 foo@computer-name,您在尝试代码时应该相应地更新它们。

让我们在这个 shell 中定义一个名为 Hello 的模块

iex> defmodule Hello do
...>   def world, do: IO.puts "hello world"
...> end

如果您在同一网络上还有另一台安装了 Erlang 和 Elixir 的计算机,您可以在其上启动另一个 shell。如果您没有,您可以在另一个终端中启动另一个 IEx 会话。无论哪种情况,都给它一个名为 bar 的短名称

$ iex --sname bar

请注意,在这个新的 IEx 会话中,我们无法访问 Hello.world/0

iex> Hello.world
** (UndefinedFunctionError) function Hello.world/0 is undefined (module Hello is not available)
    Hello.world()

但是,我们可以从 bar@computer-namefoo@computer-name 上生成一个新进程!让我们试一试(其中 @computer-name 是您在本地看到的那个)

iex> Node.spawn_link(:"foo@computer-name", fn -> Hello.world() end)
#PID<9014.59.0>
hello world

Elixir 在另一个节点上生成了一个进程并返回了它的 PID。然后代码在另一个节点上执行,该节点存在 Hello.world/0 函数,并调用了该函数。请注意,"hello world" 的结果是在当前节点 bar 上打印的,而不是在 foo 上。换句话说,要打印的消息是从 foo 发送回 bar 的。这是因为在另一个节点(foo)上生成的进程知道所有输出应该发送回原始节点!

我们可以像往常一样从 Node.spawn_link/2 返回的 PID 发送和接收消息。让我们尝试一个简单的 ping-pong 示例

iex> pid = Node.spawn_link(:"foo@computer-name", fn ->
...>   receive do
...>     {:ping, client} -> send(client, :pong)
...>   end
...> end)
#PID<9014.59.0>
iex> send(pid, {:ping, self()})
{:ping, #PID<0.73.0>}
iex> flush()
:pong
:ok

从我们的快速探索中,我们可以得出结论,我们应该使用 Node.spawn_link/2 在每次需要进行分布式计算时在远程节点上生成进程。但是,我们在这本指南中已经了解到,应该尽可能避免在监督树之外生成进程,因此我们需要寻找其他选项。

在我们的实现中,Node.spawn_link/2 有三个更好的替代方案

  1. 我们可以使用 Erlang 的 :erpc 模块在远程节点上执行函数。在上面的 bar@computer-name shell 中,您可以调用 :erpc.call(:"foo@computer-name", Hello, :world, []),它将打印 "hello world"

  2. 我们可以在另一个节点上运行一个服务器,并通过 GenServer API 向该节点发送请求。例如,您可以通过使用 GenServer.call({name, node}, arg) 或传递远程进程 PID 作为第一个参数来调用远程节点上的服务器

  3. 我们可以使用 任务,我们在 上一章 中已经学习过,因为它们可以在本地和远程节点上生成

以上选项具有不同的属性。GenServer 会将您的请求序列化到单个服务器上,而任务实际上是在远程节点上异步运行的,唯一的序列化点是主管执行的生成。

对于我们的路由层,我们将使用任务,但您可以随意探索其他替代方案。

async/await

到目前为止,我们已经探索了以隔离的方式启动和运行的任务,不考虑它们的返回值。但是,有时运行一个任务来计算一个值并在以后读取它的结果很有用。为此,任务还提供了 async/await 模式

task = Task.async(fn -> compute_something_expensive() end)
res  = compute_something_else()
res + Task.await(task)

async/await 提供了一种非常简单的机制来并发计算值。不仅如此,async/await 还可以与我们之前章节中使用过的相同 Task.Supervisor 一起使用。我们只需要调用 Task.Supervisor.async/2 而不是 Task.Supervisor.start_child/2,并使用 Task.await/2 稍后读取结果。

分布式任务

分布式任务与受监督的任务完全相同。唯一的区别是我们在主管上生成任务时传递节点名称。打开 :kv 应用程序中的 lib/kv/supervisor.ex。让我们在树的最后一个子节点中添加一个任务主管

{Task.Supervisor, name: KV.RouterTasks},

现在,让我们再次启动两个命名节点,但在 :kv 应用程序中

$ iex --sname foo -S mix
$ iex --sname bar -S mix

bar@computer-name 内部,我们现在可以通过主管直接在另一个节点上生成一个任务

iex> task = Task.Supervisor.async({KV.RouterTasks, :"foo@computer-name"}, fn ->
...>   {:ok, node()}
...> end)
%Task{
  mfa: {:erlang, :apply, 2},
  owner: #PID<0.122.0>,
  pid: #PID<12467.88.0>,
  ref: #Reference<0.0.0.400>
}
iex> Task.await(task)
{:ok, :"foo@computer-name"}

我们的第一个分布式任务检索任务正在运行的节点的名称。请注意,我们向 Task.Supervisor.async/2 提供了一个匿名函数,但在分布式情况下,最好显式地提供模块、函数和参数

iex> task = Task.Supervisor.async({KV.RouterTasks, :"foo@computer-name"}, Kernel, :node, [])
%Task{
  mfa: {Kernel, :node, 0},
  owner: #PID<0.122.0>,
  pid: #PID<12467.89.0>,
  ref: #Reference<0.0.0.404>
}
iex> Task.await(task)
:"foo@computer-name"

不同之处在于,匿名函数要求目标节点具有与调用者完全相同的代码版本。使用模块、函数和参数更加健壮,因为您只需要在给定模块中找到一个具有匹配元数的函数。

有了这些知识,我们终于可以编写路由代码了。

路由层

lib/kv/router.ex 中创建一个文件,内容如下

defmodule KV.Router do
  @doc """
  Dispatch the given `mod`, `fun`, `args` request
  to the appropriate node based on the `bucket`.
  """
  def route(bucket, mod, fun, args) do
    # Get the first byte of the binary
    first = :binary.first(bucket)

    # Try to find an entry in the table() or raise
    entry =
      Enum.find(table(), fn {enum, _node} ->
        first in enum
      end) || no_entry_error(bucket)

    # If the entry node is the current node
    if elem(entry, 1) == node() do
      apply(mod, fun, args)
    else
      {KV.RouterTasks, elem(entry, 1)}
      |> Task.Supervisor.async(KV.Router, :route, [bucket, mod, fun, args])
      |> Task.await()
    end
  end

  defp no_entry_error(bucket) do
    raise "could not find entry for #{inspect bucket} in table #{inspect table()}"
  end

  @doc """
  The routing table.
  """
  def table do
    # Replace computer-name with your local machine name
    [{?a..?m, :"foo@computer-name"}, {?n..?z, :"bar@computer-name"}]
  end
end

让我们编写一个测试来验证我们的路由器是否正常工作。创建一个名为 test/kv/router_test.exs 的文件,其中包含

defmodule KV.RouterTest do
  use ExUnit.Case, async: true

  test "route requests across nodes" do
    assert KV.Router.route("hello", Kernel, :node, []) ==
             :"foo@computer-name"
    assert KV.Router.route("world", Kernel, :node, []) ==
             :"bar@computer-name"
  end

  test "raises on unknown entries" do
    assert_raise RuntimeError, ~r/could not find entry/, fn ->
      KV.Router.route(<<0>>, Kernel, :node, [])
    end
  end
end

第一个测试调用 Kernel.node/0,该函数根据桶名称 "hello" 和 "world" 返回当前节点的名称。根据我们迄今为止的路由表,我们应该分别得到 foo@computer-namebar@computer-name 作为响应。

第二个测试检查代码是否为未知条目引发异常。

为了运行第一个测试,我们需要运行两个节点。进入 apps/kv,让我们重新启动名为 bar 的节点,该节点将被测试使用。

$ iex --sname bar -S mix

现在使用以下命令运行测试

$ elixir --sname foo -S mix test

测试应该通过。

测试过滤器和标签

尽管我们的测试通过了,但我们的测试结构变得更加复杂。特别是,仅使用 mix test 运行测试会导致我们的套件出现故障,因为我们的测试需要连接到另一个节点。

幸运的是,ExUnit 带有一个用于标记测试的功能,允许我们根据这些标记运行特定的回调甚至完全过滤测试。我们已经在上一章中使用过 :capture_log 标记,其语义由 ExUnit 本身指定。

这次,让我们在 test/kv/router_test.exs 中添加一个 :distributed 标记

@tag :distributed
test "route requests across nodes" do

编写 @tag :distributed 等效于编写 @tag distributed: true

对测试进行适当标记后,我们现在可以检查该节点是否在网络上处于活动状态,如果不在,我们可以排除所有分布式测试。打开 :kv 应用程序中的 test/test_helper.exs 并添加以下内容

exclude =
  if Node.alive?(), do: [], else: [distributed: true]

ExUnit.start(exclude: exclude)

现在使用 mix test 运行测试

$ mix test
Excluding tags: [distributed: true]

.......

Finished in 0.05 seconds
9 tests, 0 failures, 1 excluded

这次所有测试都通过了,ExUnit 警告我们分布式测试被排除在外。如果您使用 $ elixir --sname foo -S mix test 运行测试,则应该运行一项额外的测试并成功通过,只要 bar@computer-name 节点可用。

命令 mix test 还允许我们动态地包含和排除标记。例如,我们可以运行 $ mix test --include distributed 来运行分布式测试,而不考虑在 test/test_helper.exs 中设置的值。我们还可以传递 --exclude 来从命令行中排除特定标记。最后,可以使用 --only 来仅运行具有特定标记的测试

$ elixir --sname foo -S mix test --only distributed

您可以在 ExUnit.Case 模块文档中详细了解过滤器、标记和默认标记。

连接所有内容

现在我们的路由系统已经到位,让我们更改 KVServer 以使用路由器。将 KVServer.Command 中的 lookup/2 函数从以下内容

defp lookup(bucket, callback) do
  case KV.Registry.lookup(KV.Registry, bucket) do
    {:ok, pid} -> callback.(pid)
    :error -> {:error, :not_found}
  end
end

更改为以下内容

defp lookup(bucket, callback) do
  case KV.Router.route(bucket, KV.Registry, :lookup, [KV.Registry, bucket]) do
    {:ok, pid} -> callback.(pid)
    :error -> {:error, :not_found}
  end
end

与其直接查找注册表,我们现在使用路由器来匹配特定节点。然后我们得到一个 pid,它可以来自我们集群中的任何进程。从现在起,GETPUTDELETE 请求都会被路由到相应的节点。

我们还要确保在创建新的桶时,它会出现在正确的节点上。将 KVServer.Command 中的 run/1 函数(与 :create 命令匹配的函数)替换为以下内容

def run({:create, bucket}) do
  case KV.Router.route(bucket, KV.Registry, :create, [KV.Registry, bucket]) do
    pid when is_pid(pid) -> {:ok, "OK\r\n"}
    _ -> {:error, "FAILED TO CREATE BUCKET"}
  end
end

现在,如果您运行测试,您会看到一个检查服务器交互的现有测试会失败,因为它会尝试使用路由表。为了解决此故障,更改 :kv_server 应用程序的 test_helper.exs,就像我们对 :kv 所做的那样,并为该测试添加 @tag :distributed

@tag :distributed
test "server interaction", %{socket: socket} do

但是,请记住,通过使测试分布式,我们可能不太频繁地运行它,因为我们可能不会在每次测试运行时都进行分布式设置。我们将在下一章学习如何解决这个问题,方法是学习如何使路由表可配置。

总结

我们只是触及了分布式所能实现的皮毛。

在所有示例中,我们都依赖于 Erlang 的能力,即在有请求时自动连接节点。例如,当我们调用 Node.spawn_link(:"foo@computer-name", fn -> Hello.world() end) 时,Erlang 会自动连接到该节点并启动一个新进程。但是,您可能还想采用更明确的方法来连接,方法是使用 Node.connect/1Node.disconnect/1

默认情况下,Erlang 建立一个完全网状网络,这意味着所有节点都相互连接。在这种拓扑结构下,Erlang 分布式系统已知可以扩展到同一个集群中的数十个节点。Erlang 还具有隐藏节点的概念,它可以允许开发人员组装自定义拓扑结构,如 Partisan 项目中所示。

在生产环境中,您可能会有节点在任何时间进行连接和断开连接。在这种情况下,您需要提供节点可发现性。像 libclusterdns_cluster 这样的库提供了多种使用 DNS、Kubernetes 等来实现节点可发现性的策略。

在现实生活中使用的分布式键值存储需要考虑节点可能在任何时间上下线,以及跨节点迁移桶。此外,桶通常需要在节点之间复制,因此一个节点的故障不会导致整个桶丢失。这个过程称为复制。我们的实现不会尝试解决这些问题。相反,我们假设节点数量是固定的,因此使用固定的路由表。

这些主题乍一看可能令人生畏,但请记住,大多数 Elixir 框架会为您抽象这些问题。例如,当使用 Phoenix Web 框架 时,它的即插即用抽象会负责发送消息并跟踪用户加入和离开集群的方式。但是,如果您对分布式系统感兴趣,还有很多东西可以探索。以下是一些额外的参考资料

您还将在整个 Erlang 生态系统中找到许多用于构建分布式系统的库。现在,是时候回到我们简单的分布式键值存储,学习如何为生产环境配置和打包它了。