查看源代码 任务和 gen_tcp
在本章中,我们将学习如何使用 Erlang 的 :gen_tcp
模块 来服务请求。这提供了一个很好的机会来探索 Elixir 的 Task
模块。在未来的章节中,我们将扩展我们的服务器,使其能够真正地服务命令。
回声服务器
我们将首先通过实现一个回声服务器来启动我们的 TCP 服务器。它将用请求中收到的文本发送一个响应。我们将慢慢改进我们的服务器,直到它被监督并准备好处理多个连接。
总的来说,TCP 服务器执行以下步骤
- 监听一个端口,直到端口可用并获得套接字
- 等待该端口上的客户端连接并接受它
- 读取客户端请求并写回响应
让我们实现这些步骤。移动到 apps/kv_server
应用程序,打开 lib/kv_server.ex
,并添加以下函数
defmodule KVServer do
require Logger
def accept(port) do
# The options below mean:
#
# 1. `:binary` - receives data as binaries (instead of lists)
# 2. `packet: :line` - receives data line by line
# 3. `active: false` - blocks on `:gen_tcp.recv/2` until data is available
# 4. `reuseaddr: true` - allows us to reuse the address if the listener crashes
#
{:ok, socket} =
:gen_tcp.listen(port, [:binary, packet: :line, active: false, reuseaddr: true])
Logger.info("Accepting connections on port #{port}")
loop_acceptor(socket)
end
defp loop_acceptor(socket) do
{:ok, client} = :gen_tcp.accept(socket)
serve(client)
loop_acceptor(socket)
end
defp serve(socket) do
socket
|> read_line()
|> write_line(socket)
serve(socket)
end
defp read_line(socket) do
{:ok, data} = :gen_tcp.recv(socket, 0)
data
end
defp write_line(line, socket) do
:gen_tcp.send(socket, line)
end
end
我们将通过调用 KVServer.accept(4040)
来启动我们的服务器,其中 4040 是端口。 accept/1
中的第一步是监听端口,直到套接字可用,然后调用 loop_acceptor/1
。 loop_acceptor/1
是一个循环,接受客户端连接。对于每个接受的连接,我们都调用 serve/1
。
serve/1
是另一个循环,从套接字读取一行并将这些行写回套接字。请注意,serve/1
函数使用管道运算符 |>/2
来表达此操作流程。管道运算符计算左侧并将其结果作为第一个参数传递给右侧的函数。上面的例子
socket |> read_line() |> write_line(socket)
等同于
write_line(read_line(socket), socket)
read_line/1
实现使用 :gen_tcp.recv/2
从套接字接收数据,write_line/2
使用 :gen_tcp.send/2
写入套接字。
请注意,serve/1
是一个无限循环,在 loop_acceptor/1
内部顺序调用,因此对 loop_acceptor/1
的尾调用永远不会被执行,可以避免。但是,正如我们将看到的,我们需要在单独的进程中执行 serve/1
,所以我们很快就会需要这个尾调用。
这几乎是我们实现回声服务器所需的一切。让我们试试吧!
使用 iex -S mix
在 kv_server
应用程序中启动一个 IEx 会话。在 IEx 中,运行
iex> KVServer.accept(4040)
服务器现在正在运行,您甚至会注意到控制台被阻塞。让我们使用 一个 telnet
客户端 来访问我们的服务器。大多数操作系统上都有可用的客户端,它们的命令行通常类似
$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
hello
is it me
is it me
you are looking for?
you are looking for?
输入 "hello",按回车键,您将收到 "hello"。优秀!
我的特定 telnet 客户端可以通过输入 ctrl + ]
、输入 quit
然后按 <Enter>
来退出,但是您的客户端可能需要不同的步骤。
退出 telnet 客户端后,您可能会在 IEx 会话中看到一个错误
** (MatchError) no match of right hand side value: {:error, :closed}
(kv_server) lib/kv_server.ex:45: KVServer.read_line/1
(kv_server) lib/kv_server.ex:37: KVServer.serve/1
(kv_server) lib/kv_server.ex:30: KVServer.loop_acceptor/1
这是因为我们一直在期待从 :gen_tcp.recv/2
接收数据,但客户端关闭了连接。我们需要在未来版本的服务器中更好地处理此类情况。
现在,有一个更重要的错误需要修复:如果我们的 TCP 接受者崩溃了会发生什么?由于没有监督,服务器会死亡,我们无法再服务更多请求,因为它不会重新启动。这就是为什么我们必须将服务器移到监督树中的原因。
任务
我们已经了解了代理、通用服务器和监督者。它们都是为了处理多个消息或管理状态而设计的。但是,当我们只需要执行一些任务并且仅此而已时,我们该使用什么呢?
Task
模块提供了完全此功能。例如,它有一个 Task.start_link/1
函数,该函数接收一个匿名函数并在一个新的进程中执行它,该进程将成为监督树的一部分。
让我们试试。打开 lib/kv_server/application.ex
,让我们更改 start/2
函数中的监督者,如下所示
def start(_type, _args) do
children = [
{Task, fn -> KVServer.accept(4040) end}
]
opts = [strategy: :one_for_one, name: KVServer.Supervisor]
Supervisor.start_link(children, opts)
end
像往常一样,我们传递了一个有两个元素的元组作为子进程规范,它反过来将调用 Task.start_link/1
。
通过此更改,我们表示希望将 KVServer.accept(4040)
作为任务运行。我们现在正在对端口进行硬编码,但这可以通过几种方法更改,例如,在启动应用程序时从系统环境中读取端口
port = String.to_integer(System.get_env("PORT") || "4040")
# ...
{Task, fn -> KVServer.accept(port) end}
在您的代码中插入这些更改,现在您可以使用以下命令启动您的应用程序:PORT=4321 mix run --no-halt
,请注意我们是如何将端口作为变量传递的,但如果未指定端口,则仍然默认为 4040。
现在服务器是监督树的一部分,当我们运行应用程序时,它应该自动启动。启动您的服务器,现在传递端口,然后再次使用 telnet
客户端以确保一切正常
$ telnet 127.0.0.1 4321
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
say you
say you
say me
say me
是的,它有效!但是,它是否 *可扩展*?
尝试同时连接两个 telnet 客户端。这样做时,您会注意到第二个客户端没有回显
$ telnet 127.0.0.1 4321
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
hello?
HELLOOOOOO?
它似乎根本不起作用。这是因为我们正在与接受连接的同一个进程中服务请求。当一个客户端连接时,我们无法接受另一个客户端。
任务监督者
为了使我们的服务器能够处理同时连接,我们需要有一个进程作为接受者,该接受者会生成其他进程来服务请求。一种解决方案是更改
defp loop_acceptor(socket) do
{:ok, client} = :gen_tcp.accept(socket)
serve(client)
loop_acceptor(socket)
end
也要使用 Task.start_link/1
defp loop_acceptor(socket) do
{:ok, client} = :gen_tcp.accept(socket)
Task.start_link(fn -> serve(client) end)
loop_acceptor(socket)
end
我们正在从接受者进程直接启动一个链接的任务。但是我们已经犯过一次这种错误。你记得吗?
这类似于我们在直接从注册表中调用 KV.Bucket.start_link/1
时所犯的错误。这意味着任何存储桶中的故障都会导致整个注册表崩溃。
上面的代码将具有相同的缺陷:如果我们将 serve(client)
任务链接到接受者,则在服务请求时崩溃会导致接受者(以及所有其他连接)崩溃。
我们通过使用一个简单的单对一监督者解决了注册表的问题。我们将在本文中使用相同的策略,只不过这种模式在任务中非常常见,Task
已经提供了解决方案:一个简单的单对一监督者,它将临时任务作为我们监督树的一部分启动。
让我们再次更改 start/2
,将监督者添加到我们的树中
def start(_type, _args) do
port = String.to_integer(System.get_env("PORT") || "4040")
children = [
{Task.Supervisor, name: KVServer.TaskSupervisor},
{Task, fn -> KVServer.accept(port) end}
]
opts = [strategy: :one_for_one, name: KVServer.Supervisor]
Supervisor.start_link(children, opts)
end
我们现在将启动一个名为 KVServer.TaskSupervisor
的 Task.Supervisor
进程。请记住,由于接受者任务依赖于此监督者,因此必须首先启动监督者。
现在我们需要更改 loop_acceptor/1
以使用 Task.Supervisor
来服务每个请求
defp loop_acceptor(socket) do
{:ok, client} = :gen_tcp.accept(socket)
{:ok, pid} = Task.Supervisor.start_child(KVServer.TaskSupervisor, fn -> serve(client) end)
:ok = :gen_tcp.controlling_process(client, pid)
loop_acceptor(socket)
end
您可能会注意到,我们添加了一行::ok = :gen_tcp.controlling_process(client, pid)
。这使子进程成为 client
套接字的“控制进程”。如果我们没有这样做,接受者将在崩溃时导致所有客户端崩溃,因为套接字将与接受它们的进程绑定(这是默认行为)。
使用 PORT=4040 mix run --no-halt
启动一个新服务器,我们现在可以打开许多并发 telnet 客户端。您还会注意到,退出客户端不会导致接受者崩溃。优秀!
以下是完整的回声服务器实现
defmodule KVServer do
require Logger
@doc """
Starts accepting connections on the given `port`.
"""
def accept(port) do
{:ok, socket} = :gen_tcp.listen(port,
[:binary, packet: :line, active: false, reuseaddr: true])
Logger.info "Accepting connections on port #{port}"
loop_acceptor(socket)
end
defp loop_acceptor(socket) do
{:ok, client} = :gen_tcp.accept(socket)
{:ok, pid} = Task.Supervisor.start_child(KVServer.TaskSupervisor, fn -> serve(client) end)
:ok = :gen_tcp.controlling_process(client, pid)
loop_acceptor(socket)
end
defp serve(socket) do
socket
|> read_line()
|> write_line(socket)
serve(socket)
end
defp read_line(socket) do
{:ok, data} = :gen_tcp.recv(socket, 0)
data
end
defp write_line(line, socket) do
:gen_tcp.send(socket, line)
end
end
由于我们更改了监督者规范,因此我们需要问:我们的监督策略是否仍然正确?
在这种情况下,答案是肯定的:如果接受者崩溃,则无需崩溃现有连接。另一方面,如果任务监督者崩溃,则无需崩溃接受者。
但是,仍然有一个问题,那就是重启策略。默认情况下,任务的 :restart
值设置为 :temporary
,这意味着它们不会重启。对于通过 Task.Supervisor
启动的连接,这是一个很好的默认值,因为重启失败的连接没有意义,但这对于接受者来说是一个糟糕的选择。如果接受者崩溃,我们希望它再次启动并运行。
让我们修复它。我们知道,对于形状为 {Task, fun}
的子进程,Elixir 将调用 Task.child_spec(fun)
来检索底层子进程规范。因此,人们可能会想象,要将 {Task, fun}
规范更改为具有 :restart
的 :permanent
,我们需要更改 Task
模块。但是,这样做是不可能的,因为 Task
模块是作为 Elixir 标准库的一部分定义的(即使可能,它也不太可能是一个好主意)。幸运的是,这可以通过使用 Supervisor.child_spec/2
来完成,它允许我们使用新值配置子进程规范。让我们再次重写 start/2
在 KVServer.Application
中
def start(_type, _args) do
port = String.to_integer(System.get_env("PORT") || "4040")
children = [
{Task.Supervisor, name: KVServer.TaskSupervisor},
Supervisor.child_spec({Task, fn -> KVServer.accept(port) end}, restart: :permanent)
]
opts = [strategy: :one_for_one, name: KVServer.Supervisor]
Supervisor.start_link(children, opts)
end
现在我们有一个始终运行的接受者,它在始终运行的任务监督者下启动临时任务进程。
在下一章中,我们将开始解析客户端请求并发送响应,完成我们的服务器。