查看源代码 Stream (Elixir v1.16.2)

用于创建和组合流的函数。

流是可组合的、延迟的枚举器(有关枚举器的介绍,请参阅 Enum 模块)。任何在枚举期间逐个生成元素的枚举器都称为流。例如,Elixir 的 Range 是一个流

iex> range = 1..5
1..5
iex> Enum.map(range, &(&1 * 2))
[2, 4, 6, 8, 10]

在上面的示例中,当我们在范围内进行映射时,枚举的元素是在枚举期间逐个创建的。 Stream 模块允许我们映射范围,而不会触发其枚举

iex> range = 1..3
iex> stream = Stream.map(range, &(&1 * 2))
iex> Enum.map(stream, &(&1 + 1))
[3, 5, 7]

请注意,我们从一个范围开始,然后我们创建了一个流,该流旨在将范围中的每个元素乘以 2。此时,没有进行任何计算。只有当 Enum.map/2 被调用时,我们才会真正枚举范围中的每个元素,将它乘以 2 并加上 1。我们说 Stream 中的函数是延迟的,而 Enum 中的函数是急切的

由于它们的延迟性,流在处理大型(甚至无限的)集合时非常有用。当使用 Enum 链多个操作时,会创建中间列表,而 Stream 创建一个计算配方,这些计算将在稍后执行。让我们看另一个例子

1..3
|> Enum.map(&IO.inspect(&1))
|> Enum.map(&(&1 * 2))
|> Enum.map(&IO.inspect(&1))
1
2
3
2
4
6
#=> [2, 4, 6]

请注意,我们首先打印了列表中的每个元素,然后将每个元素乘以 2,最后打印了每个新值。在这个例子中,列表被枚举了三次。让我们看看流的例子

stream = 1..3
|> Stream.map(&IO.inspect(&1))
|> Stream.map(&(&1 * 2))
|> Stream.map(&IO.inspect(&1))
Enum.to_list(stream)
1
2
2
4
3
6
#=> [2, 4, 6]

尽管最终结果相同,但元素打印的顺序发生了变化!使用流,我们打印第一个元素,然后打印它的两倍。在这个例子中,列表只被枚举了一次!

这就是我们之前说流是可组合的、延迟的枚举器时所指的。请注意,我们可以多次调用 Stream.map/2,有效地组合流并保持它们的延迟性。只有当您从 Enum 模块调用函数时才会执行计算。

Enum 一样,此模块中的函数在线性时间内工作。这意味着执行操作所需的时间与列表长度的增长速度相同。这在 Stream.map/2 等操作中是预期的。毕竟,如果我们要遍历流上的每个元素,流越长,我们需要遍历的元素就越多,所需的时间就越长。

创建流

Elixir 的标准库中有很多函数返回流,一些例子是

此模块还提供了许多用于创建流的便利函数,例如 Stream.cycle/1Stream.unfold/2Stream.resource/3 等等。

请注意,此模块中的函数保证返回枚举器。由于枚举器可以具有不同的形状(结构体、匿名函数等),此模块中的函数可能返回其中任何一种形状,并且这可能会随时更改。例如,今天返回匿名函数的函数在将来的版本中可能会返回一个结构体。

概要

类型

基于零的索引。

函数

通过缓冲 fun 返回相同值的元素来对 enum 进行分块。

chunk_every(enum, count, count) 的快捷方式。

将枚举器流式传输为块,每个块包含 count 个元素,其中每个新块从枚举器的 step 个元素开始。

enum 进行分块,在每个块被发出时进行细粒度的控制。

创建一个流,该流枚举枚举器中的每个枚举器。

创建一个流,该流枚举第一个参数,然后是第二个参数。

创建一个流,该流无限循环遍历给定的枚举器。

创建一个流,该流只发出与上次发出的元素不同的元素。

创建一个流,该流只发出元素,如果对元素调用 fun 的结果不同于(存储的)对上次发出的元素调用 fun 的结果。

延迟地从枚举器中删除接下来的 n 个元素。

创建一个流,该流从枚举器中删除每个第 nth 个元素。

延迟地删除枚举器中的元素,直到给定函数返回真值。

在流中将给定元素重复 n 次。

对每个元素执行给定的函数。

创建一个流,该流根据枚举期间对给定函数的调用来过滤元素。

enumerable 上的给定 fun 进行映射,并扁平化结果。

延迟地在枚举的每个元素之间插入 intersperse_element

创建一个流,该流在给定的 n 毫秒时间段后发出一个值。

将流值作为副作用注入给定的可收集对象中。

发出一个值序列,从 start_value 开始。连续的值是通过对前一个值调用 next_fun 生成的。

创建一个流,该流将在枚举期间应用给定的函数。

创建一个流,该流将在枚举器中每隔 nth 个元素应用给定的函数。

创建一个流,该流将在枚举期间根据给定函数拒绝元素。

返回一个流,该流是通过重复调用 generator_fun 生成的。

为给定的资源发出一个值序列。

运行给定的流。

创建一个流,该流将给定函数应用于每个元素,发出结果,并将相同的结果用作下一次计算的累加器。使用枚举器中的第一个元素作为起始值。

创建一个流,该流将给定函数应用于每个元素,发出结果,并将相同的结果用作下一次计算的累加器。使用给定的 acc 作为起始值。

延迟地从枚举器中获取接下来的 count 个元素,并停止枚举。

创建一个流,该流从枚举器中获取每个第 nth 个元素。

延迟地获取枚举器中的元素,直到给定函数返回真值。

创建一个流,该流在 n 毫秒后发出一个值。

转换现有的流。

类似于 Stream.transform/5,只是没有提供 last_fun

使用基于函数的启动、最后一个和之后回调来转换现有的流。

为给定的累加器发出一个值序列。

创建一个流,该流只发出唯一的元素。

创建一个流,该流只发出唯一的元素,通过删除函数 fun 返回重复元素的元素来实现。

创建一个流,其中枚举器中的每个元素都将与其索引一起封装在一个元组中。

将来自有限枚举器集合的对应元素压缩成一个元组流。

延迟地将两个枚举器压缩在一起。

延迟地将来自有限枚举器集合的对应元素压缩成一个新的枚举器,并在过程中使用 zip_fun 函数对其进行转换。

延迟地将两个枚举器中的对应元素压缩成一个新的枚举器,并在过程中使用 zip_fun 函数对其进行转换。

类型

@type acc() :: any()
@type default() :: any()
@type element() :: any()
@type index() :: non_neg_integer()

基于零的索引。

@type timer() :: non_neg_integer() | :infinity

函数

@spec chunk_by(Enumerable.t(), (element() -> any())) :: Enumerable.t()

通过缓冲 fun 返回相同值的元素来对 enum 进行分块。

只有当 fun 返回一个新值或 enum 结束时才会发出元素。

示例

iex> stream = Stream.chunk_by([1, 2, 2, 3, 4, 4, 6, 7, 7], &(rem(&1, 2) == 1))
iex> Enum.to_list(stream)
[[1], [2, 2], [3], [4, 4, 6], [7, 7]]
链接到此函数

chunk_every(enum, count)

查看源代码 (自 1.5.0 起)
@spec chunk_every(Enumerable.t(), pos_integer()) :: Enumerable.t()

chunk_every(enum, count, count) 的快捷方式。

链接到此函数

chunk_every(enum, count, step, leftover \\ [])

查看源代码 (自 1.5.0 起)
@spec chunk_every(
  Enumerable.t(),
  pos_integer(),
  pos_integer(),
  Enumerable.t() | :discard
) ::
  Enumerable.t()

将枚举器流式传输为块,每个块包含 count 个元素,其中每个新块从枚举器的 step 个元素开始。

step 是可选的,如果没有传递,则默认为 count,即块不重叠。分块将在集合结束或我们发出不完整的块时停止。

如果最后一个块没有足够的元素来填充块,则从 leftover 中获取元素来填充块。如果 leftover 中没有足够的元素来填充块,则返回一个部分块,其中包含少于 count 个元素。

如果在 leftover 中给出 :discard,则最后一个块将被丢弃,除非它恰好包含 count 个元素。

示例

iex> Stream.chunk_every([1, 2, 3, 4, 5, 6], 2) |> Enum.to_list()
[[1, 2], [3, 4], [5, 6]]

iex> Stream.chunk_every([1, 2, 3, 4, 5, 6], 3, 2, :discard) |> Enum.to_list()
[[1, 2, 3], [3, 4, 5]]

iex> Stream.chunk_every([1, 2, 3, 4, 5, 6], 3, 2, [7]) |> Enum.to_list()
[[1, 2, 3], [3, 4, 5], [5, 6, 7]]

iex> Stream.chunk_every([1, 2, 3, 4, 5, 6], 3, 3, []) |> Enum.to_list()
[[1, 2, 3], [4, 5, 6]]

iex> Stream.chunk_every([1, 2, 3, 4], 3, 3, Stream.cycle([0])) |> Enum.to_list()
[[1, 2, 3], [4, 0, 0]]
链接到此函数

chunk_while(enum, acc, chunk_fun, after_fun)

查看源代码 (自 1.5.0 起)
@spec chunk_while(
  Enumerable.t(),
  acc(),
  (element(), acc() -> {:cont, chunk, acc()} | {:cont, acc()} | {:halt, acc()}),
  (acc() -> {:cont, chunk, acc()} | {:cont, acc()})
) :: Enumerable.t()
when chunk: any()

enum 进行分块,在每个块被发出时进行细粒度的控制。

chunk_fun 接收当前元素和累加器,并必须返回 {:cont, element, acc} 以发出给定块并继续使用累加器,或者返回 {:cont, acc} 以不发出任何块并继续使用返回的累加器。

after_fun 在迭代完成后被调用,也必须返回 {:cont, element, acc}{:cont, acc}

示例

iex> chunk_fun = fn element, acc ->
...>   if rem(element, 2) == 0 do
...>     {:cont, Enum.reverse([element | acc]), []}
...>   else
...>     {:cont, [element | acc]}
...>   end
...> end
iex> after_fun = fn
...>   [] -> {:cont, []}
...>   acc -> {:cont, Enum.reverse(acc), []}
...> end
iex> stream = Stream.chunk_while(1..10, [], chunk_fun, after_fun)
iex> Enum.to_list(stream)
[[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]]
@spec concat(Enumerable.t()) :: Enumerable.t()

创建一个流,该流枚举枚举器中的每个枚举器。

示例

iex> stream = Stream.concat([1..3, 4..6, 7..9])
iex> Enum.to_list(stream)
[1, 2, 3, 4, 5, 6, 7, 8, 9]
@spec concat(Enumerable.t(), Enumerable.t()) :: Enumerable.t()

创建一个流,该流枚举第一个参数,然后是第二个参数。

示例

iex> stream = Stream.concat(1..3, 4..6)
iex> Enum.to_list(stream)
[1, 2, 3, 4, 5, 6]

iex> stream1 = Stream.cycle([1, 2, 3])
iex> stream2 = Stream.cycle([4, 5, 6])
iex> stream = Stream.concat(stream1, stream2)
iex> Enum.take(stream, 6)
[1, 2, 3, 1, 2, 3]
@spec cycle(Enumerable.t()) :: Enumerable.t()

创建一个流,该流无限循环遍历给定的枚举器。

示例

iex> stream = Stream.cycle([1, 2, 3])
iex> Enum.take(stream, 5)
[1, 2, 3, 1, 2]
@spec dedup(Enumerable.t()) :: Enumerable.t()

创建一个流,该流只发出与上次发出的元素不同的元素。

此函数只需要存储最后一个发出的元素。

元素使用 ===/2 进行比较。

示例

iex> Stream.dedup([1, 2, 3, 3, 2, 1]) |> Enum.to_list()
[1, 2, 3, 2, 1]
@spec dedup_by(Enumerable.t(), (element() -> term())) :: Enumerable.t()

创建一个流,该流只发出元素,如果对元素调用 fun 的结果不同于(存储的)对上次发出的元素调用 fun 的结果。

示例

iex> Stream.dedup_by([{1, :x}, {2, :y}, {2, :z}, {1, :x}], fn {x, _} -> x end) |> Enum.to_list()
[{1, :x}, {2, :y}, {1, :x}]
@spec drop(Enumerable.t(), integer()) :: Enumerable.t()

延迟地从枚举器中删除接下来的 n 个元素。

如果给出负数 n,它将从集合中删除最后 n 个元素。请注意,此实现机制将延迟任何元素的发出,直到枚举器发出 n 个额外元素。

示例

iex> stream = Stream.drop(1..10, 5)
iex> Enum.to_list(stream)
[6, 7, 8, 9, 10]

iex> stream = Stream.drop(1..10, -5)
iex> Enum.to_list(stream)
[1, 2, 3, 4, 5]
@spec drop_every(Enumerable.t(), non_neg_integer()) :: Enumerable.t()

创建一个流,该流从枚举器中删除每个第 nth 个元素。

第一个元素总是被删除,除非 nth 为 0。

nth 必须是非负整数。

示例

iex> stream = Stream.drop_every(1..10, 2)
iex> Enum.to_list(stream)
[2, 4, 6, 8, 10]

iex> stream = Stream.drop_every(1..1000, 1)
iex> Enum.to_list(stream)
[]

iex> stream = Stream.drop_every([1, 2, 3, 4, 5], 0)
iex> Enum.to_list(stream)
[1, 2, 3, 4, 5]
@spec drop_while(Enumerable.t(), (element() -> as_boolean(term()))) :: Enumerable.t()

延迟地删除枚举器中的元素,直到给定函数返回真值。

示例

iex> stream = Stream.drop_while(1..10, &(&1 <= 5))
iex> Enum.to_list(stream)
[6, 7, 8, 9, 10]
链接到此函数

duplicate(value, n)

查看源代码 (自 1.14.0 起)
@spec duplicate(any(), non_neg_integer()) :: Enumerable.t()

在流中将给定元素重复 n 次。

n 是一个大于或等于 0 的整数。

如果 n0,则返回一个空流。

示例

iex> stream = Stream.duplicate("hello", 0)
iex> Enum.to_list(stream)
[]

iex> stream = Stream.duplicate("hi", 1)
iex> Enum.to_list(stream)
["hi"]

iex> stream = Stream.duplicate("bye", 2)
iex> Enum.to_list(stream)
["bye", "bye"]

iex> stream = Stream.duplicate([1, 2], 3)
iex> Enum.to_list(stream)
[[1, 2], [1, 2], [1, 2]]
@spec each(Enumerable.t(), (element() -> term())) :: Enumerable.t()

对每个元素执行给定的函数。

流中的值不会改变,因此此函数对于向流添加副作用(如打印)很有用。如果需要生成不同的流,请参见 map/2

示例

iex> stream = Stream.each([1, 2, 3], fn x -> send(self(), x) end)
iex> Enum.to_list(stream)
iex> receive do: (x when is_integer(x) -> x)
1
iex> receive do: (x when is_integer(x) -> x)
2
iex> receive do: (x when is_integer(x) -> x)
3
@spec filter(Enumerable.t(), (element() -> as_boolean(term()))) :: Enumerable.t()

创建一个流,该流根据枚举期间对给定函数的调用来过滤元素。

示例

iex> stream = Stream.filter([1, 2, 3], fn x -> rem(x, 2) == 0 end)
iex> Enum.to_list(stream)
[2]
链接到此函数

flat_map(enum, mapper)

查看源代码
@spec flat_map(Enumerable.t(), (element() -> Enumerable.t())) :: Enumerable.t()

enumerable 上的给定 fun 进行映射,并扁平化结果。

此函数返回一个新的流,该流通过将对 enumerable 的每个元素调用 fun 的结果连接在一起构建。

示例

iex> stream = Stream.flat_map([1, 2, 3], fn x -> [x, x * 2] end)
iex> Enum.to_list(stream)
[1, 2, 2, 4, 3, 6]

iex> stream = Stream.flat_map([1, 2, 3], fn x -> [[x]] end)
iex> Enum.to_list(stream)
[[1], [2], [3]]
链接到此函数

intersperse(enumerable, intersperse_element)

查看源代码 (自 1.6.0 起)
@spec intersperse(Enumerable.t(), any()) :: Enumerable.t()

延迟地在枚举的每个元素之间插入 intersperse_element

示例

iex> Stream.intersperse([1, 2, 3], 0) |> Enum.to_list()
[1, 0, 2, 0, 3]

iex> Stream.intersperse([1], 0) |> Enum.to_list()
[1]

iex> Stream.intersperse([], 0) |> Enum.to_list()
[]
@spec interval(timer()) :: Enumerable.t()

创建一个流,该流在给定的 n 毫秒时间段后发出一个值。

发出的值是从 0 开始的递增计数器。每次流式传输新元素时,此操作都会在给定时间间隔内阻塞调用方。

不要使用此函数生成数字序列。如果不需要阻塞调用方进程,请使用 Stream.iterate(0, & &1 + 1) 代替。

示例

iex> Stream.interval(10) |> Enum.take(10)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
链接到此函数

into(enum, collectable, transform \\ fn x -> x end)

查看源代码
@spec into(Enumerable.t(), Collectable.t(), (term() -> term())) :: Enumerable.t()

将流值作为副作用注入给定的可收集对象中。

此函数通常与 run/1 一起使用,因为任何评估都会延迟到流执行时。有关示例,请参见 run/1

链接到此函数

iterate(start_value, next_fun)

查看源代码
@spec iterate(element(), (element() -> element())) :: Enumerable.t()

发出一个值序列,从 start_value 开始。连续的值是通过对前一个值调用 next_fun 生成的。

示例

iex> Stream.iterate(0, &(&1 + 1)) |> Enum.take(5)
[0, 1, 2, 3, 4]
@spec map(Enumerable.t(), (element() -> any())) :: Enumerable.t()

创建一个流,该流将在枚举期间应用给定的函数。

示例

iex> stream = Stream.map([1, 2, 3], fn x -> x * 2 end)
iex> Enum.to_list(stream)
[2, 4, 6]
链接到此函数

map_every(enum, nth, fun)

查看源代码 (自 1.4.0 起)
@spec map_every(Enumerable.t(), non_neg_integer(), (element() -> any())) ::
  Enumerable.t()

创建一个流,该流将在枚举器中每隔 nth 个元素应用给定的函数。

第一个元素始终传递给给定函数。

nth 必须是非负整数。

示例

iex> stream = Stream.map_every(1..10, 2, fn x -> x * 2 end)
iex> Enum.to_list(stream)
[2, 2, 6, 4, 10, 6, 14, 8, 18, 10]

iex> stream = Stream.map_every([1, 2, 3, 4, 5], 1, fn x -> x * 2 end)
iex> Enum.to_list(stream)
[2, 4, 6, 8, 10]

iex> stream = Stream.map_every(1..5, 0, fn x -> x * 2 end)
iex> Enum.to_list(stream)
[1, 2, 3, 4, 5]
@spec reject(Enumerable.t(), (element() -> as_boolean(term()))) :: Enumerable.t()

创建一个流,该流将在枚举期间根据给定函数拒绝元素。

示例

iex> stream = Stream.reject([1, 2, 3], fn x -> rem(x, 2) == 0 end)
iex> Enum.to_list(stream)
[1, 3]
链接到此函数

repeatedly(generator_fun)

查看源代码
@spec repeatedly((-> element())) :: Enumerable.t()

返回一个流,该流是通过重复调用 generator_fun 生成的。

示例

# Although not necessary, let's seed the random algorithm
iex> :rand.seed(:exsss, {1, 2, 3})
iex> Stream.repeatedly(&:rand.uniform/0) |> Enum.take(3)
[0.5455598952593053, 0.6039309974353404, 0.6684893034823949]
链接到此函数

resource(start_fun, next_fun, after_fun)

查看源代码
@spec resource(
  (-> acc()),
  (acc() -> {[element()], acc()} | {:halt, acc()}),
  (acc() -> term())
) ::
  Enumerable.t()

为给定的资源发出一个值序列。

类似于 transform/3,但初始累加值是通过 start_fun 延迟计算的,并在枚举结束时执行一个 after_fun(在成功和失败的情况下)。

通过使用前一个累加器(初始值为 start_fun 返回的结果)调用 next_fun 来生成连续的值,它必须返回一个包含要发出的元素列表和下一个累加器的元组。如果返回 {:halt, acc},则枚举结束。

顾名思义,此函数对于从资源中流式传输值很有用。

示例

Stream.resource(
  fn -> File.open!("sample") end,
  fn file ->
    case IO.read(file, :line) do
      data when is_binary(data) -> {[data], file}
      _ -> {:halt, file}
    end
  end,
  fn file -> File.close(file) end
)

iex> Stream.resource(
...>  fn ->
...>    {:ok, pid} = StringIO.open("string")
...>    pid
...>  end,
...>  fn pid ->
...>    case IO.getn(pid, "", 1) do
...>      :eof -> {:halt, pid}
...>      char -> {[char], pid}
...>    end
...>  end,
...>  fn pid -> StringIO.close(pid) end
...> ) |> Enum.to_list()
["s", "t", "r", "i", "n", "g"]
@spec run(Enumerable.t()) :: :ok

运行给定的流。

当需要运行流以产生副作用,并且对它的返回结果没有兴趣时,这很有用。

示例

打开一个文件,将所有 # 替换为 %,并将流式传输到另一个文件,而无需将整个文件加载到内存中

File.stream!("/path/to/file")
|> Stream.map(&String.replace(&1, "#", "%"))
|> Stream.into(File.stream!("/path/to/other/file"))
|> Stream.run()

在调用任何 Enum 函数或 run/1 之前,不会进行任何计算。

@spec scan(Enumerable.t(), (element(), acc() -> any())) :: Enumerable.t()

创建一个流,该流将给定函数应用于每个元素,发出结果,并将相同的结果用作下一次计算的累加器。使用枚举器中的第一个元素作为起始值。

示例

iex> stream = Stream.scan(1..5, &(&1 + &2))
iex> Enum.to_list(stream)
[1, 3, 6, 10, 15]
@spec scan(Enumerable.t(), acc(), (element(), acc() -> any())) :: Enumerable.t()

创建一个流,该流将给定函数应用于每个元素,发出结果,并将相同的结果用作下一次计算的累加器。使用给定的 acc 作为起始值。

示例

iex> stream = Stream.scan(1..5, 0, &(&1 + &2))
iex> Enum.to_list(stream)
[1, 3, 6, 10, 15]
@spec take(Enumerable.t(), integer()) :: Enumerable.t()

延迟地从枚举器中获取接下来的 count 个元素,并停止枚举。

如果给出负数 count,则将获取最后 count 个值。为此,将完全枚举集合,并在内存中保留最多 2 * count 个元素。当集合结束时,将执行最后 count 个元素。因此,在无限集合上使用负数 count 将永远不会返回。

示例

iex> stream = Stream.take(1..100, 5)
iex> Enum.to_list(stream)
[1, 2, 3, 4, 5]

iex> stream = Stream.take(1..100, -5)
iex> Enum.to_list(stream)
[96, 97, 98, 99, 100]

iex> stream = Stream.cycle([1, 2, 3]) |> Stream.take(5)
iex> Enum.to_list(stream)
[1, 2, 3, 1, 2]
@spec take_every(Enumerable.t(), non_neg_integer()) :: Enumerable.t()

创建一个流,该流从枚举器中获取每个第 nth 个元素。

第一个元素始终包含在内,除非 nth 为 0。

nth 必须是非负整数。

示例

iex> stream = Stream.take_every(1..10, 2)
iex> Enum.to_list(stream)
[1, 3, 5, 7, 9]

iex> stream = Stream.take_every([1, 2, 3, 4, 5], 1)
iex> Enum.to_list(stream)
[1, 2, 3, 4, 5]

iex> stream = Stream.take_every(1..1000, 0)
iex> Enum.to_list(stream)
[]
@spec take_while(Enumerable.t(), (element() -> as_boolean(term()))) :: Enumerable.t()

延迟地获取枚举器中的元素,直到给定函数返回真值。

示例

iex> stream = Stream.take_while(1..100, &(&1 <= 5))
iex> Enum.to_list(stream)
[1, 2, 3, 4, 5]
@spec timer(timer()) :: Enumerable.t()

创建一个流,该流在 n 毫秒后发出一个值。

发出的值为 0。此操作将在给定的时间内阻塞调用方,直到元素被流式传输。

示例

iex> Stream.timer(10) |> Enum.to_list()
[0]
链接到此函数

transform(enum, acc, reducer)

查看源代码
@spec transform(Enumerable.t(), acc, fun) :: Enumerable.t()
when fun: (element(), acc -> {Enumerable.t(), acc} | {:halt, acc}), acc: any()

转换现有的流。

它期望一个累加器和一个函数,该函数接收两个参数:流元素和更新后的累加器。它必须返回一个元组,其中第一个元素是一个新流(通常是一个列表)或原子 :halt,第二个元素是下一个元素要使用的累加器。

注意:此函数等效于 Enum.flat_map_reduce/3,只是此函数在流处理完毕后不会返回累加器。

示例

Stream.transform/3 很有用,因为它可以用作实现此模块中定义的许多函数的基础。例如,我们可以实现 Stream.take(enum, n) 如下

iex> enum = 1001..9999
iex> n = 3
iex> stream = Stream.transform(enum, 0, fn i, acc ->
...>   if acc < n, do: {[i], acc + 1}, else: {:halt, acc}
...> end)
iex> Enum.to_list(stream)
[1001, 1002, 1003]

Stream.transform/5 进一步概括了此函数,以允许包装资源。

链接到此函数

transform(enum, start_fun, reducer, after_fun)

查看源代码
@spec transform(Enumerable.t(), start_fun, reducer, after_fun) :: Enumerable.t()
when start_fun: (-> acc),
     reducer: (element(), acc -> {Enumerable.t(), acc} | {:halt, acc}),
     after_fun: (acc -> term()),
     acc: any()

类似于 Stream.transform/5,只是没有提供 last_fun

此函数可以看作是 Stream.resource/3Stream.transform/3 的组合。

链接到此函数

transform(enum, start_fun, reducer, last_fun, after_fun)

查看源代码
@spec transform(Enumerable.t(), start_fun, reducer, last_fun, after_fun) ::
  Enumerable.t()
when start_fun: (-> acc),
     reducer: (element(), acc -> {Enumerable.t(), acc} | {:halt, acc}),
     last_fun: (acc -> {Enumerable.t(), acc} | {:halt, acc}),
     after_fun: (acc -> term()),
     acc: any()

使用基于函数的启动、最后一个和之后回调来转换现有的流。

一旦转换开始,就会调用 start_fun 来计算初始累加器。然后,对于枚举器中的每个元素,都会使用元素和累加器调用 reducer 函数,返回新的元素和新的累加器,就像在 transform/3 中一样。

一旦集合完成,就会使用累加器调用 last_fun 来发出任何剩余的项目。然后调用 after_fun 来关闭任何资源,但不发出任何新项目。只有当给定的枚举器成功终止(因为已完成或自行停止)时,才会调用 last_fun。始终调用 after_fun,因此 after_fun 必须是用于关闭资源的那个。

链接到此函数

unfold(next_acc, next_fun)

查看源代码
@spec unfold(acc(), (acc() -> {element(), acc()} | nil)) :: Enumerable.t()

为给定的累加器发出一个值序列。

通过使用前一个累加器调用 next_fun 来生成连续的值,它必须返回一个包含当前值和下一个累加器的元组。如果返回 nil,则枚举结束。

示例

要创建一个向下计数并在零之前停止的流

iex> Stream.unfold(5, fn
...>   0 -> nil
...>   n -> {n, n - 1}
...> end) |> Enum.to_list()
[5, 4, 3, 2, 1]

如果 next_fun 从不返回 nil,则返回的流是*无限*的

iex> Stream.unfold(0, fn
...>   n -> {n, n + 1}
...> end) |> Enum.take(10)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

iex> Stream.unfold(1, fn
...>   n -> {n, n * 2}
...> end) |> Enum.take(10)
[1, 2, 4, 8, 16, 32, 64, 128, 256, 512]
@spec uniq(Enumerable.t()) :: Enumerable.t()

创建一个流,该流只发出唯一的元素。

请记住,为了知道一个元素是否唯一,此函数需要存储流发出的所有唯一值。因此,如果流是无限的,则存储的元素数量将无限增长,永远不会被垃圾回收。

示例

iex> Stream.uniq([1, 2, 3, 3, 2, 1]) |> Enum.to_list()
[1, 2, 3]
@spec uniq_by(Enumerable.t(), (element() -> term())) :: Enumerable.t()

创建一个流,该流只发出唯一的元素,通过删除函数 fun 返回重复元素的元素来实现。

函数 fun 将每个元素映射到一个项,该项用于确定两个元素是否重复。

请记住,为了知道一个元素是否唯一,此函数需要存储流发出的所有唯一值。因此,如果流是无限的,则存储的元素数量将无限增长,永远不会被垃圾回收。

示例

iex> Stream.uniq_by([{1, :x}, {2, :y}, {1, :z}], fn {x, _} -> x end) |> Enum.to_list()
[{1, :x}, {2, :y}]

iex> Stream.uniq_by([a: {:tea, 2}, b: {:tea, 2}, c: {:coffee, 1}], fn {_, y} -> y end) |> Enum.to_list()
[a: {:tea, 2}, c: {:coffee, 1}]
链接到此函数

with_index(enum, offset \\ 0)

查看源代码
@spec with_index(Enumerable.t(), integer()) :: Enumerable.t()

创建一个流,其中枚举器中的每个元素都将与其索引一起封装在一个元组中。

如果给出 offset,我们将从给定的偏移量而不是从零开始索引。

示例

iex> stream = Stream.with_index([1, 2, 3])
iex> Enum.to_list(stream)
[{1, 0}, {2, 1}, {3, 2}]

iex> stream = Stream.with_index([1, 2, 3], 3)
iex> Enum.to_list(stream)
[{1, 3}, {2, 4}, {3, 5}]
链接到此函数

zip(enumerables)

查看源代码 (自 1.4.0 起)
@spec zip(enumerables) :: Enumerable.t()
when enumerables: [Enumerable.t()] | Enumerable.t()

将来自有限枚举器集合的对应元素压缩成一个元组流。

一旦给定集合中的任何枚举器完成,压缩就会结束。

示例

iex> concat = Stream.concat(1..3, 4..6)
iex> cycle = Stream.cycle(["foo", "bar", "baz"])
iex> Stream.zip([concat, [:a, :b, :c], cycle]) |> Enum.to_list()
[{1, :a, "foo"}, {2, :b, "bar"}, {3, :c, "baz"}]
链接到此函数

zip(enumerable1, enumerable2)

查看源代码
@spec zip(Enumerable.t(), Enumerable.t()) :: Enumerable.t()

延迟地将两个枚举器压缩在一起。

一旦任何一个枚举器完成,压缩就会结束。

示例

iex> concat = Stream.concat(1..3, 4..6)
iex> cycle = Stream.cycle([:a, :b, :c])
iex> Stream.zip(concat, cycle) |> Enum.to_list()
[{1, :a}, {2, :b}, {3, :c}, {4, :a}, {5, :b}, {6, :c}]
链接到此函数

zip_with(enumerables, zip_fun)

查看源代码 (自 1.12.0 起)
@spec zip_with(enumerables, (Enumerable.t() -> term())) :: Enumerable.t()
when enumerables: [Enumerable.t()] | Enumerable.t()

延迟地将来自有限枚举器集合的对应元素压缩成一个新的枚举器,并在过程中使用 zip_fun 函数对其进行转换。

来自 enumerables 中每个枚举的第一个元素将被放入一个列表中,然后传递给一元 zip_fun 函数。然后,来自每个枚举的第二个元素将被放入一个列表中并传递给 zip_fun,以此类推,直到 enumerables 中的任何一个枚举完成。

返回一个新的枚举,其中包含调用 zip_fun 的结果。

示例

iex> concat = Stream.concat(1..3, 4..6)
iex> Stream.zip_with([concat, concat], fn [a, b] -> a + b end) |> Enum.to_list()
[2, 4, 6, 8, 10, 12]

iex> concat = Stream.concat(1..3, 4..6)
iex> Stream.zip_with([concat, concat, 1..3], fn [a, b, c] -> a + b + c end) |> Enum.to_list()
[3, 6, 9]
链接到此函数

zip_with(enumerable1, enumerable2, zip_fun)

查看源代码 (自 1.12.0 起)
@spec zip_with(Enumerable.t(), Enumerable.t(), (term(), term() -> term())) ::
  Enumerable.t()

延迟地将两个枚举器中的对应元素压缩成一个新的枚举器,并在过程中使用 zip_fun 函数对其进行转换。

zip_fun 将使用 enumerable1 的第一个元素和 enumerable2 的第一个元素进行调用,然后使用每个元素的第二个元素进行调用,以此类推,直到任一枚举完成。

示例

iex> concat = Stream.concat(1..3, 4..6)
iex> Stream.zip_with(concat, concat, fn a, b -> a + b end) |> Enum.to_list()
[2, 4, 6, 8, 10, 12]