查看源代码 监督动态子进程

我们现在成功定义了我们的监督者,它会作为我们应用程序生命周期的一部分自动启动(和停止)。

但是,请记住,我们的 KV.Registryhandle_cast/2 回调中同时链接(通过 start_link)和监控(通过 monitor)桶进程。

{:ok, bucket} = KV.Bucket.start_link([])
ref = Process.monitor(bucket)

链接是双向的,这意味着桶中的崩溃会使注册表崩溃。虽然我们现在有监督者,它保证注册表会恢复运行,但注册表崩溃仍然意味着我们丢失了所有将桶名称与其各自进程相关联的数据。

换句话说,我们希望即使桶崩溃,注册表也能继续运行。让我们编写一个新的注册表测试

test "removes bucket on crash", %{registry: registry} do
  KV.Registry.create(registry, "shopping")
  {:ok, bucket} = KV.Registry.lookup(registry, "shopping")

  # Stop the bucket with non-normal reason
  Agent.stop(bucket, :shutdown)
  assert KV.Registry.lookup(registry, "shopping") == :error
end

该测试类似于“在退出时移除桶”,不同之处在于我们发送 :shutdown 作为退出原因而不是 :normal,从而更加严格。如果进程以 :normal 以外的原因终止,所有链接的进程都会收到 EXIT 信号,导致链接的进程也终止,除非它正在捕获退出。

由于桶终止,注册表也停止了,当尝试使用 GenServer.call/3 时,我们的测试失败了。

  1) test removes bucket on crash (KV.RegistryTest)
     test/kv/registry_test.exs:26
     ** (exit) exited in: GenServer.call(#PID<0.148.0>, {:lookup, "shopping"}, 5000)
         ** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started
     code: assert KV.Registry.lookup(registry, "shopping") == :error
     stacktrace:
       (elixir) lib/gen_server.ex:770: GenServer.call/3
       test/kv/registry_test.exs:33: (test)

我们将通过定义一个新的监督者来解决这个问题,该监督者将生成并监督所有桶。与我们之前定义的监督者相反,子进程不是预先知道的,而是动态启动的。对于这些情况,我们使用一个专门针对此类用例的监督者,称为 DynamicSupervisorDynamicSupervisor 在初始化期间不期望子进程列表;相反,每个子进程都是通过 DynamicSupervisor.start_child/2 手动启动的。

桶监督者

由于 DynamicSupervisor 在初始化期间没有定义任何子进程,因此 DynamicSupervisor 也允许我们跳过定义一个包含通常的 start_link 函数和 init 回调的单独模块的工作。相反,我们可以通过给它一个名称和一个策略来直接在监督树中定义一个 DynamicSupervisor

打开 lib/kv/supervisor.ex 并添加动态监督者作为子进程,如下所示

  def init(:ok) do
    children = [
      {KV.Registry, name: KV.Registry},
      {DynamicSupervisor, name: KV.BucketSupervisor, strategy: :one_for_one}
    ]

    Supervisor.init(children, strategy: :one_for_one)
  end

请记住,进程的名称可以是任何原子。到目前为止,我们已将进程命名为与定义其实现的模块相同的名称。例如,由 KV.Registry 定义的进程被赋予了 KV.Registry 的进程名称。这只是一个约定:如果以后系统中出现错误,例如“名为 KV.Registry 的进程以原因崩溃”,我们就知道确切要调查的地方。

在这种情况下,没有模块,所以我们选择了 KV.BucketSupervisor 这个名称。它可以是任何其他名称。我们还选择了 :one_for_one 策略,这是目前动态监督者唯一可用的策略。

运行 iex -S mix,这样我们就可以尝试使用我们的动态监督者

iex> {:ok, bucket} = DynamicSupervisor.start_child(KV.BucketSupervisor, KV.Bucket)
{:ok, #PID<0.72.0>}
iex> KV.Bucket.put(bucket, "eggs", 3)
:ok
iex> KV.Bucket.get(bucket, "eggs")
3

DynamicSupervisor.start_child/2 需要监督者的名称和要启动的子进程的子进程规范。

最后一步是更改注册表以使用动态监督者

  def handle_cast({:create, name}, {names, refs}) do
    if Map.has_key?(names, name) do
      {:noreply, {names, refs}}
    else
      {:ok, pid} = DynamicSupervisor.start_child(KV.BucketSupervisor, KV.Bucket)
      ref = Process.monitor(pid)
      refs = Map.put(refs, ref, name)
      names = Map.put(names, name, pid)
      {:noreply, {names, refs}}
    end
  end

这足以使我们的测试通过,但我们的应用程序中存在资源泄漏。当桶终止时,监督者将在其位置启动一个新的桶。毕竟,这是监督者的作用!

但是,当监督者重新启动新的桶时,注册表并不知道它。所以我们将在监督者中有一个空的桶,没有人可以访问!为了解决这个问题,我们希望说桶实际上是临时的。如果它们崩溃,无论原因如何,都不应重新启动它们。

我们可以通过在 KV.Bucket 中将 restart: :temporary 选项传递给 use Agent 来实现这一点

defmodule KV.Bucket do
  use Agent, restart: :temporary

让我们还在 test/kv/bucket_test.exs 中添加一个测试,以保证桶是临时的

  test "are temporary workers" do
    assert Supervisor.child_spec(KV.Bucket, []).restart == :temporary
  end

我们的测试使用 Supervisor.child_spec/2 函数从模块中检索子进程规范,然后断言其重启值为 :temporary。此时,您可能想知道为什么如果它从不重新启动其子进程,为什么要使用监督者?事实证明,监督者提供的不仅仅是重启,它们还负责保证正确的启动和关闭,尤其是在监督树崩溃的情况下。

监督树

当我们将 KV.BucketSupervisor 作为 KV.Supervisor 的子进程添加时,我们开始拥有监督其他监督者的监督者,形成了所谓的“监督树”。

每次向监督者添加新子进程时,都需要评估监督者策略是否正确以及子进程的顺序。在这种情况下,我们使用的是 :one_for_one,并且 KV.RegistryKV.BucketSupervisor 之前启动。

立即出现的一个缺陷是排序问题。由于 KV.Registry 调用 KV.BucketSupervisor,因此 KV.BucketSupervisor 必须在 KV.Registry 之前启动。否则,可能会出现注册表在桶监督者启动之前尝试访问它的情况。

第二个缺陷与监督策略有关。如果 KV.Registry 死亡,所有将 KV.Bucket 名称链接到桶进程的信息都会丢失。因此,KV.BucketSupervisor 及其所有子进程也必须终止 - 否则我们将拥有孤儿进程。

鉴于此观察结果,我们应该考虑转向其他监督策略。另外两个候选者是 :one_for_all:rest_for_one。使用 :rest_for_one 策略的监督者将杀死并重新启动在崩溃的子进程之后启动的子进程。在这种情况下,我们希望 KV.BucketSupervisorKV.Registry 终止时终止。这将要求桶监督者放在注册表之后,这违反了我们在上面两段中建立的排序约束。

所以我们最后的方案是全押,选择 :one_for_all 策略:只要任何一个子进程死亡,监督者就会杀死并重新启动其所有子进程。对于我们的应用程序来说,这是一个完全合理的方案,因为注册表离不开桶监督者,而桶监督者在没有注册表的情况下应该终止。让我们重新实现 KV.Supervisor 中的 init/1 以编码这些属性

  def init(:ok) do
    children = [
      {DynamicSupervisor, name: KV.BucketSupervisor, strategy: :one_for_one},
      {KV.Registry, name: KV.Registry}
    ]

    Supervisor.init(children, strategy: :one_for_all)
  end

在进入下一章之前,还有两个主题需要讨论。

测试中的共享状态

到目前为止,我们一直为每个测试启动一个注册表,以确保它们是隔离的

setup do
  registry = start_supervised!(KV.Registry)
  %{registry: registry}
end

由于我们已经更改了注册表以使用 KV.BucketSupervisor,因此我们的测试现在依赖于这个共享监督者,即使每个测试都有自己的注册表。问题是:我们应该这样做吗?

这取决于情况。只要我们只依赖于该状态的非共享部分,就可以依赖共享状态。虽然多个注册表可能会在共享桶监督者上启动桶,但这些桶和注册表彼此隔离。只有在我们使用像 DynamicSupervisor.count_children(KV.BucketSupervisor) 这样的函数时才会遇到并发问题,该函数将计算来自所有注册表的所有桶,这可能会在测试并发运行时产生不同的结果。

由于到目前为止我们只依赖于桶监督者的非共享部分,所以我们不必担心测试套件中的并发问题。如果它成为问题,我们可以为每个测试启动一个监督者,并将其作为参数传递给注册表 start_link 函数。

观察者

现在我们已经定义了我们的监督树,这是一个很好的机会来介绍 Erlang 附带的观察者工具。使用 iex -S mix 启动应用程序,并键入以下内容

iex> :observer.start()

缺少依赖项

在使用 iex -S mix 在具有 iex 的项目中运行时,observer 不会作为依赖项提供。为此,您需要在调用以下函数之前调用以下函数

iex> Mix.ensure_application!(:wx)
iex> Mix.ensure_application!(:runtime_tools)
iex> Mix.ensure_application!(:observer)
iex> :observer.start()

如果上述任何调用失败,则可能是以下情况之一:某些包管理器默认安装一个没有 WX 绑定以支持 GUI 的最小化 Erlang。在某些包管理器中,您可能能够用更完整的包替换无头 Erlang(在 Debian/Ubuntu/Arch 上寻找名为 erlang 而不是 erlang-nox 的包)。在其他管理器中,您可能需要安装一个单独的 erlang-wx(或类似名称)包。

正在讨论在未来的版本中改进这种体验。

应该会弹出一个 GUI,其中包含关于我们系统的各种信息,从一般统计数据到负载图表以及所有正在运行的进程和应用程序的列表。

在应用程序选项卡中,您将看到当前在系统中运行的所有应用程序以及它们的监督树。您可以选择 kv 应用程序以进一步探索它

Observer GUI screenshot

不仅如此,当您在终端上创建新的桶时,您应该看到在观察者中显示的监督树中生成的新进程

iex> KV.Registry.create(KV.Registry, "shopping")
:ok

我们将把进一步探索观察者提供的功能留给您。请注意,您可以双击监督树中的任何进程以检索有关它的更多信息,以及右键单击进程以发送“杀死信号”,这是一种模拟故障并查看您的监督者是否按预期反应的完美方法。

归根结底,像观察者这样的工具是您始终希望在监督树内启动进程的原因之一,即使它们是临时的,以确保它们始终可访问和可检查。

现在我们的桶已正确链接并受监督,让我们看看如何加快速度。