查看源代码 Task.Supervisor (Elixir v1.16.2)
一个任务主管。
此模块定义了一个主管,可用于动态管理任务。
任务主管启动时没有子进程,通常在一个主管和一个名称下启动。
children = [
{Task.Supervisor, name: MyApp.TaskSupervisor}
]
Supervisor.start_link(children, strategy: :one_for_one)
子进程规范中给出的选项在 start_link/1
中有说明。
启动后,您可以直接在主管下启动任务,例如
task = Task.Supervisor.async(MyApp.TaskSupervisor, fn ->
:do_some_work
end)
有关更多示例,请参阅 Task
模块。
可扩展性和分区
The Task.Supervisor
is a single process responsible for starting other processes. In some applications, the Task.Supervisor
may become a bottleneck. To address this, you can start multiple instances of the Task.Supervisor
and then pick a random instance to start the task on.
Instead of
children = [
{Task.Supervisor, name: MyApp.TaskSupervisor}
]
and
Task.Supervisor.async(MyApp.TaskSupervisor, fn -> :do_some_work end)
You can do this
children = [
{PartitionSupervisor,
child_spec: Task.Supervisor,
name: MyApp.TaskSupervisors}
]
and then
Task.Supervisor.async(
{:via, PartitionSupervisor, {MyApp.TaskSupervisors, self()}},
fn -> :do_some_work end
)
In the code above, we start a partition supervisor that will by default start a dynamic supervisor for each core in your machine. Then, instead of calling the Task.Supervisor
by name, you call it through the partition supervisor using the {:via, PartitionSupervisor, {name, key}}
format, where name
is the name of the partition supervisor and key
is the routing key. We picked self()
as the routing key, which means each process will be assigned one of the existing task supervisors. Read the PartitionSupervisor
docs for more information.
名称注册
A Task.Supervisor
is bound to the same name registration rules as a GenServer
. Read more about them in the GenServer
docs.
总结
函数
启动一个可以等待的任务。
启动一个可以等待的任务。
启动一个可以等待的任务。
返回一个流,该流在 enumerable
中的每个元素上并发运行给定的函数 fun
。
返回一个流,其中给定函数 (module
和 function
) 并发映射到 enumerable
中的每个元素。
返回一个流,该流在 enumerable
中的每个元素上并发运行给定的 function
。
返回一个流,其中给定函数 (module
和 function
) 并发映射到 enumerable
中的每个元素。
返回所有子进程 PID,除了那些正在重启的进程。
将一个任务作为给定 supervisor
的子进程启动。
将一个任务作为给定 supervisor
的子进程启动。
启动一个新的主管。
终止具有给定 pid
的子进程。
类型
@type option() :: DynamicSupervisor.option() | DynamicSupervisor.init_option()
由 start_link
使用的选项值。
函数
@spec async(Supervisor.supervisor(), (-> any()), Keyword.t()) :: Task.t()
启动一个可以等待的任务。
The supervisor
must be a reference as defined in Supervisor
. The task will still be linked to the caller, see Task.async/1
for more information and async_nolink/3
for a non-linked variant.
Raises an error if supervisor
has reached the maximum number of children.
选项
:shutdown
-:brutal_kill
if the tasks must be killed directly on shutdown or an integer indicating the timeout value, defaults to 5000 milliseconds. The tasks must trap exits for the timeout to have an effect.
启动一个可以等待的任务。
The supervisor
must be a reference as defined in Supervisor
. The task will still be linked to the caller, see Task.async/1
for more information and async_nolink/3
for a non-linked variant.
Raises an error if supervisor
has reached the maximum number of children.
选项
:shutdown
-:brutal_kill
if the tasks must be killed directly on shutdown or an integer indicating the timeout value, defaults to 5000 milliseconds. The tasks must trap exits for the timeout to have an effect.
@spec async_nolink(Supervisor.supervisor(), (-> any()), Keyword.t()) :: Task.t()
启动一个可以等待的任务。
The supervisor
must be a reference as defined in Supervisor
. The task won't be linked to the caller, see Task.async/1
for more information.
Raises an error if supervisor
has reached the maximum number of children.
Note this function requires the task supervisor to have :temporary
as the :restart
option (the default), as async_nolink/3
keeps a direct reference to the task which is lost if the task is restarted.
选项
:shutdown
-:brutal_kill
if the tasks must be killed directly on shutdown or an integer indicating the timeout value, defaults to 5000 milliseconds. The tasks must trap exits for the timeout to have an effect.
与 OTP 行为的兼容性
If you create a task using async_nolink
inside an OTP behaviour like GenServer
, you should match on the message coming from the task inside your GenServer.handle_info/2
callback.
The reply sent by the task will be in the format {ref, result}
, where ref
is the monitor reference held by the task struct and result
is the return value of the task function.
Keep in mind that, regardless of how the task created with async_nolink
terminates, the caller's process will always receive a :DOWN
message with the same ref
value that is held by the task struct. If the task terminates normally, the reason in the :DOWN
message will be :normal
.
示例
Typically, you use async_nolink/3
when there is a reasonable expectation that the task may fail, and you don't want it to take down the caller. Let's see an example where a GenServer
is meant to run a single task and track its status
defmodule MyApp.Server do
use GenServer
# ...
def start_task do
GenServer.call(__MODULE__, :start_task)
end
# In this case the task is already running, so we just return :ok.
def handle_call(:start_task, _from, %{ref: ref} = state) when is_reference(ref) do
{:reply, :ok, state}
end
# The task is not running yet, so let's start it.
def handle_call(:start_task, _from, %{ref: nil} = state) do
task =
Task.Supervisor.async_nolink(MyApp.TaskSupervisor, fn ->
...
end)
# We return :ok and the server will continue running
{:reply, :ok, %{state | ref: task.ref}}
end
# The task completed successfully
def handle_info({ref, answer}, %{ref: ref} = state) do
# We don't care about the DOWN message now, so let's demonitor and flush it
Process.demonitor(ref, [:flush])
# Do something with the result and then return
{:noreply, %{state | ref: nil}}
end
# The task failed
def handle_info({:DOWN, ref, :process, _pid, _reason}, %{ref: ref} = state) do
# Log and possibly restart the task...
{:noreply, %{state | ref: nil}}
end
end
启动一个可以等待的任务。
The supervisor
must be a reference as defined in Supervisor
. The task won't be linked to the caller, see Task.async/1
for more information.
Raises an error if supervisor
has reached the maximum number of children.
Note this function requires the task supervisor to have :temporary
as the :restart
option (the default), as async_nolink/5
keeps a direct reference to the task which is lost if the task is restarted.
@spec async_stream( Supervisor.supervisor(), Enumerable.t(), (term() -> term()), keyword() ) :: Enumerable.t()
返回一个流,该流在 enumerable
中的每个元素上并发运行给定的函数 fun
。
Each element in enumerable
is passed as argument to the given function fun
and processed by its own task. The tasks will be spawned under the given supervisor
and linked to the caller process, similarly to async/3
.
See async_stream/6
for discussion, options, and examples.
async_stream(supervisor, enumerable, module, function, args, options \\ [])
查看源代码 (自 1.4.0 版本起)@spec async_stream( Supervisor.supervisor(), Enumerable.t(), module(), atom(), [term()], keyword() ) :: Enumerable.t()
返回一个流,其中给定函数 (module
和 function
) 并发映射到 enumerable
中的每个元素。
Each element will be prepended to the given args
and processed by its own task. The tasks will be spawned under the given supervisor
and linked to the caller process, similarly to async/5
.
When streamed, each task will emit {:ok, value}
upon successful completion or {:exit, reason}
if the caller is trapping exits. The order of results depends on the value of the :ordered
option.
The level of concurrency and the time tasks are allowed to run can be controlled via options (see the "Options" section below).
If you find yourself trapping exits to handle exits inside the async stream, consider using async_stream_nolink/6
to start tasks that are not linked to the calling process.
选项
:max_concurrency
- sets the maximum number of tasks to run at the same time. Defaults toSystem.schedulers_online/0
.:ordered
- whether the results should be returned in the same order as the input stream. This option is useful when you have large streams and don't want to buffer results before they are delivered. This is also useful when you're using the tasks for side effects. Defaults totrue
.:timeout
- the maximum amount of time to wait (in milliseconds) without receiving a task reply (across all running tasks). Defaults to5000
.:on_timeout
- what do to when a task times out. The possible values are:exit
(default) - the process that spawned the tasks exits.:kill_task
- the task that timed out is killed. The value emitted for that task is{:exit, :timeout}
.
:zip_input_on_exit
- (since v1.14.0) adds the original input to:exit
tuples. The value emitted for that task is{:exit, {input, reason}}
, whereinput
is the collection element that caused an exited during processing. Defaults tofalse
.:shutdown
-:brutal_kill
if the tasks must be killed directly on shutdown or an integer indicating the timeout value. Defaults to5000
milliseconds. The tasks must trap exits for the timeout to have an effect.
示例
Let's build a stream and then enumerate it
stream = Task.Supervisor.async_stream(MySupervisor, collection, Mod, :expensive_fun, [])
Enum.to_list(stream)
@spec async_stream_nolink( Supervisor.supervisor(), Enumerable.t(), (term() -> term()), keyword() ) :: Enumerable.t()
返回一个流,该流在 enumerable
中的每个元素上并发运行给定的 function
。
Each element in enumerable
is passed as argument to the given function fun
and processed by its own task. The tasks will be spawned under the given supervisor
and will not be linked to the caller process, similarly to async_nolink/3
.
See async_stream/6
for discussion and examples.
async_stream_nolink(supervisor, enumerable, module, function, args, options \\ [])
查看源代码 (自 1.4.0 版本起)@spec async_stream_nolink( Supervisor.supervisor(), Enumerable.t(), module(), atom(), [term()], keyword() ) :: Enumerable.t()
返回一个流,其中给定函数 (module
和 function
) 并发映射到 enumerable
中的每个元素。
在给定的 args
前面添加 enumerable
中的每个元素,并由其自己的任务处理。这些任务将在给定的 supervisor
下生成,并且不会链接到调用者进程,类似于 async_nolink/5
。
See async_stream/6
for discussion, options, and examples.
@spec children(Supervisor.supervisor()) :: [pid()]
返回所有子进程 PID,除了那些正在重启的进程。
请注意,在低内存条件下监督大量子进程时调用此函数会导致内存不足异常。
@spec start_child(Supervisor.supervisor(), (-> any()), keyword()) :: DynamicSupervisor.on_start_child()
将一个任务作为给定 supervisor
的子进程启动。
Task.Supervisor.start_child(MyTaskSupervisor, fn ->
IO.puts "I am running in a task"
end)
请注意,生成的进程不链接到调用者,而只链接到主管。如果任务需要执行副作用(如 I/O),并且您对它的结果或是否成功完成没有兴趣,则此命令很有用。
选项
:restart
- 重启策略,可以是:temporary
(默认)、:transient
或:permanent
。:temporary
表示任务永远不会重启,:transient
表示如果退出不是:normal
、:shutdown
或{:shutdown, reason}
则重启。:permanent
重启策略表示始终重启。:shutdown
- 如果任务必须在关闭时直接被杀死,则为:brutal_kill
,或者表示超时值的整数,默认为 5000 毫秒。任务必须捕获退出才能使超时生效。
@spec start_child(Supervisor.supervisor(), module(), atom(), [term()], keyword()) :: DynamicSupervisor.on_start_child()
将一个任务作为给定 supervisor
的子进程启动。
类似于 start_child/3
,只是任务由给定的 module
、fun
和 args
指定。
@spec start_link([option()]) :: Supervisor.on_start()
启动一个新的主管。
示例
任务主管通常在使用元组格式的监督树下启动
{Task.Supervisor, name: MyApp.TaskSupervisor}
您也可以通过直接调用 start_link/1
来启动它
Task.Supervisor.start_link(name: MyApp.TaskSupervisor)
但这仅推荐用于脚本,应避免在生产代码中使用。一般来说,进程应始终在监督树内启动。
选项
:name
- 用于注册主管名称,支持的值在GenServer
模块文档的Name Registration
部分中描述;:max_restarts
、:max_seconds
和:max_children
- 如DynamicSupervisor
中所述;
此函数也可以接收 :restart
和 :shutdown
作为选项,但这两个选项已被弃用,现在建议直接将它们传递给 start_child
。
@spec terminate_child(Supervisor.supervisor(), pid()) :: :ok | {:error, :not_found}
终止具有给定 pid
的子进程。