An asynchronous computation that may produce an error of type IO.Error..
21.12. Asynchronous Programming
The Async monad provides tools and abstractions for constructing asynchronous programs that can safely multiplex different sources of data.
Typical use cases include network servers and other interactive applications that perform IO and must react to a variety of events, such as incoming data, timeouts, and disconnections.
Generally speaking, sequential programs that interact with the operating system can use IO alone.
Parallel programs should use Tasks.
Async is most appropriate when a program may spend a significant amount of time waiting on external events or I/O.
The most important feature of Async is event selection.
Given a set of potential inputs, and a computation to be carried out in response to each, event selection ensures that computations are triggered as events occur.
Each computation is triggered exactly once, as its associated event occurs, and data can never be lost.
These properties are very difficult to ensure without appropriate library support.
Behind the scenes, asynchronous tasks are represented using tasks and promises.
An asynchronous computation runs on the current thread until it must wait for a result that is not yet available, such as a timer or incoming network data.
The missing data is represented by a Promise.
At that point, the asynchronous computation suspends.
Rather than blocking the thread, it yields control and arranges to resume once the awaited promise is resolved.
A single thread can therefore make progress on many waiting computations at once.
The standard library resolves these promises in response to operating system events—timers, sockets, signals, and name resolution—using the libuv event loop as its I/O backend.
The asynchronous model itself depends only on tasks and promises, however: any source that resolves a promise, such as a channel, can be used to reinvoke an asynchronous computation just as well.
21.12.1. Asynchronous Computations
There are three monads for writing asynchronous programs, each corresponding to one of the variants of IO:
-
Asyncdescribes asynchronous computations that may throw anIO.Error, and corresponds toIO. -
EAsyncdescribes asynchronous computations that may throw a specified type of error, and corresponds toEIO. -
BaseAsyncdescribes asynchronous computations that cannot throw an error, and corresponds toBaseIO.
An asynchronous computation that may produce an error of type ε.
An asynchronous computation that never fails.
Infinite loops in EAsync and Async use a special instance of ForIn that ensures that they don't consume stack frames.
They can therefore be used in long-running asynchronous applications such as servers without the stack overflowing.
Each of these monads has a corresponding type of asynchronous tasks that it can coordinate.
These tasks can be thought of as handles to an in-flight computation.
Calling async on a monadic action creates a task that runs in the current thread until it suspends, and calling await on a task results in a monadic action that waits for the task to complete.
A MaybeTask α represents a computation that either:
-
Is immediately available as an
αvalue, or -
Is an asynchronous computation that will eventually produce an
αvalue.
Constructors
Std.Async.MaybeTask.pure {α : Type} : α → MaybeTask α
Std.Async.MaybeTask.ofTask {α : Type} : Task α → MaybeTask α
Crucially, calling await on a task never blocks an OS-level thread.
Threads are only blocked at the boundary between the IO and the Async monads.
Under the hood, asynchronous tasks are invoked when needed by the libuv event loop.
Asynchronous tasks use the same system of priorities as other Lean tasks, and are run by the same scheduler.
21.12.1.1. Running Asynchronous Computations
Asynchronous computations can be run from IO by either waiting or blocking.
When a thread waits on an asynchronous computation, the asynchronous computation is run on the thread that is waiting.
When a thread blocks on an asynchronous computation or task, the computation is run on a worker thread in an ordinary task with the specified priority, and the calling thread calls Task.get to block on the result.
Because Async is a defined alias for EAsync, generalized field notation can be used to call EAsync.wait on a term with type Async.
Std.Async.Async.block {α : Type} (x : Async α) (prio : Task.Priority := Task.Priority.default) : IO αStd.Async.Async.block {α : Type} (x : Async α) (prio : Task.Priority := Task.Priority.default) : IO α
Block until the Async finishes and returns its value. Propagates any error encountered during execution.
Std.Async.EAsync.block {ε α : Type} (x : EAsync ε α) (prio : Task.Priority := Task.Priority.default) : EIO ε αStd.Async.EAsync.block {ε α : Type} (x : EAsync ε α) (prio : Task.Priority := Task.Priority.default) : EIO ε α
Block until the EAsync finishes and returns its value. Propagates any error encountered during execution.
Block until the ETask in x finishes and returns its value. Propagates any error encountered
during execution.
Asynchronous computations can also be run as ordinary Tasks in IO.
Std.Async.EAsync.asTask {ε α : Type} (x : EAsync ε α) (prio : Task.Priority := Task.Priority.default) : EIO ε (ETask ε α)Std.Async.EAsync.asTask {ε α : Type} (x : EAsync ε α) (prio : Task.Priority := Task.Priority.default) : EIO ε (ETask ε α)
Std.Async.BaseAsync.asTask {α : Type} (x : BaseAsync α) (prio : Task.Priority := Task.Priority.default) : BaseIO (Task α)Std.Async.BaseAsync.asTask {α : Type} (x : BaseAsync α) (prio : Task.Priority := Task.Priority.default) : BaseIO (Task α)
Compared to IO.asTask, EAsync.asTask schedules an asynchronous task.
While tasks from IO.asTask are synchronous, occupying their worker thread until completed, tasks from EAsync.asTask release their worker threads at suspension points and are reinvoked as needed by the libuv event loop.
Running an Asynchronous Computation
Async.block runs an asynchronous computation and returns its result in IO.
The following program prints a message, waits ten milliseconds, and then prints another:
module
import Std.Async
open Std.Async
def greet : Async Unit := do
IO.println "before sleeping"
sleep 10
IO.println "after sleeping"
public def main : IO Unit := greet.block
It prints both messages, with a brief pause between them:
stdoutbefore sleepingafter sleeping21.12.1.2. Managing Tasks
The typical interface to asynchronous tasks is via the MonadAsync and MonadAwait instances for a monad.
Their respective methods MonadAsync.async and MonadAwait.await are exported from Std.Async.
Typically, the main thread of execution will create some number of asynchronous tasks, then await their results when needed to make progress.
The async and await functions are not built in to the Lean compiler, and they don't trigger a whole-program transformation.
They just create or consume tasks that are associated with underlying promises in the correct manner for the framework.
Typeclass for monads that can "await" a computation of type t α in a monad m until the result is
available.
Instance Constructor
Std.Async.MonadAwait.mk
Methods
await : {α : Type} → t α → m α
Awaits the result of t α and returns it inside the m monad.
Represents monads that can launch computations asynchronously of type t in a monad m.
Instance Constructor
Std.Async.MonadAsync.mk
Methods
async : {α : Type} → m α → optParam Task.Priority Task.Priority.default → m (t α)
Starts an asynchronous computation in another monad.
To launch an asynchronous task whose value will never be needed, use background.
Std.Async.background {m t : Type → Type} {α : Type} [Monad m] [MonadAsync t m] (action : m α) (prio : Task.Priority := Task.Priority.default) : m UnitStd.Async.background {m t : Type → Type} {α : Type} [Monad m] [MonadAsync t m] (action : m α) (prio : Task.Priority := Task.Priority.default) : m Unit
This function transforms the operation inside the monad m into a task and let it run in the background.
In addition to instances for the Async monads and tasks, the library includes instances that allow reader and state monad transformers to be used with async and await.
Spawning and Awaiting Tasks
async starts a computation as a task that runs concurrently, and await waits for a task's result.
Here, a color and a flavor are fetched concurrently, and the two results are combined into a pair:
def fetchColor : Async String := do
sleep 20
return "green"
def fetchFlavor : Async String := do
sleep 20
return "sweet"
def fetchBoth : Async (String × String) := do
let color ← async fetchColor
let flavor ← async fetchFlavor
return (← await color, ← await flavor)
#eval fetchBoth.block
Background Tasks
background starts a computation whose result is never awaited.
Here, a logger runs in the background and prints each message sent to a channel:
module
import Std.Async
import Std.Sync.Channel
open Std.Async
open Std (Channel)
def logger (ch : Channel String) : Async Unit := do
while true do
IO.println (← await (← ch.recv))
public def main : IO Unit := do
let ch ← Channel.new (α := String)
Async.block do
background (logger ch)
discard <| ch.send "hello from the background"
sleep 20
The background logger prints the message it receives before the program exits:
stdouthello from the background21.12.1.3. Transforming and Inspecting Tasks
The eventual result of an asynchronous task can be transformed without first awaiting it.
AsyncTask.map applies a function to a task's result, while AsyncTask.bindIO and AsyncTask.mapTaskIO sequence further IO work onto it.
In each case, an error in the original task propagates to the transformed task.
Std.Async.AsyncTask.map {α β : Type} (f : α → β) (x : AsyncTask α) (prio : Task.Priority := Task.Priority.default) (sync : Bool := false) : AsyncTask βStd.Async.AsyncTask.map {α β : Type} (f : α → β) (x : AsyncTask α) (prio : Task.Priority := Task.Priority.default) (sync : Bool := false) : AsyncTask β
Std.Async.AsyncTask.bindIO {α β : Type} (x : AsyncTask α) (f : α → IO (AsyncTask β)) (prio : Task.Priority := Task.Priority.default) (sync : Bool := false) : BaseIO (AsyncTask β)Std.Async.AsyncTask.bindIO {α β : Type} (x : AsyncTask α) (f : α → IO (AsyncTask β)) (prio : Task.Priority := Task.Priority.default) (sync : Bool := false) : BaseIO (AsyncTask β)
Std.Async.AsyncTask.mapTaskIO {α β : Type} (f : α → IO β) (x : AsyncTask α) (prio : Task.Priority := Task.Priority.default) (sync : Bool := false) : BaseIO (AsyncTask β)Std.Async.AsyncTask.mapTaskIO {α β : Type} (f : α → IO β) (x : AsyncTask α) (prio : Task.Priority := Task.Priority.default) (sync : Bool := false) : BaseIO (AsyncTask β)
A task's progress can be inspected without blocking by retrieving its IO.TaskState.
A MaybeTask is either an immediately-available value or a task that will produce one.
It can be converted to an ordinary Task, have its value read by blocking, be mapped over, and have a Task of a MaybeTask collapsed into a single Task.
Std.Async.MaybeTask.map {α β : Type} (f : α → β) (prio : Task.Priority := Task.Priority.default) (sync : Bool := false) : MaybeTask α → MaybeTask βStd.Async.MaybeTask.map {α β : Type} (f : α → β) (prio : Task.Priority := Task.Priority.default) (sync : Bool := false) : MaybeTask α → MaybeTask β
Maps a function over a MaybeTask.
21.12.1.4. Conversions
An existing Task, IO.Promise, or Except value can be converted into an Async computation.
These conversions make it possible to call code that produces a Task or IO.Promise, such as a wrapper around a callback-based API or a hand-written asynchronous primitive, directly from within an Async program.
The corresponding conversions from Task and Except are also available for EAsync and BaseAsync; the conversions from IO.Promise are specific to Async because a dropped promise is reported as an IO.Error.
An IO.Promise can be dropped before it is ever resolved, for example if the code that was expected to resolve it is canceled or abandoned and the last reference to the promise goes away.
After that, the promise can never be resolved.
Because reference counts are deterministic, the runtime detects this the moment it happens, rather than at some unpredictable later time.
Async.ofPromise and Async.ofPurePromise detect a dropped promise and produce an Async error rather than panicking; the message can be supplied via their error parameter, and defaults to the promise linked to the Async was dropped.
Std.Async.Async.ofPromise {α : Type} (task : IO (IO.Promise (Except IO.Error α))) (error : String := "the promise linked to the Async was dropped") : Async αStd.Async.Async.ofPromise {α : Type} (task : IO (IO.Promise (Except IO.Error α))) (error : String := "the promise linked to the Async was dropped") : Async α
Converts Promise into Async.
Std.Async.Async.ofPurePromise {α : Type} (task : IO (IO.Promise α)) (error : String := "the promise linked to the Async was dropped") : Async αStd.Async.Async.ofPurePromise {α : Type} (task : IO (IO.Promise α)) (error : String := "the promise linked to the Async was dropped") : Async α
Converts IO (IO.Promise α) to Async.
ETask.ofPromise! converts a promise to a task directly, panicking if the promise is dropped rather than producing an error.
Create an ETask that resolves to the value of the promise x. If the promise gets dropped then it
panics.
21.12.2. Concurrent Composition
Concurrent composition runs several asynchronous computations at the same time and combines their results.
These operators are defined in terms of async and await, but they provide a higher-level, more structured approach to concurrent asynchronous programming.
Each operator launches tasks on the shared scheduler at the priority given by the optional prio parameter, and then awaits them.
There are two families of concurrent operators: those that wait for every subcomputation and return all results, and those that return the result of the first subcomputation that finishes.
Async.concurrently runs two computations and returns their results as a pair, while Async.concurrentlyAll runs an array of computations and returns their results in the same order.
Both wait for every subcomputation to finish, awaiting them positionally rather than chronologically, so an exception is reported in the position of the failing subcomputation rather than in the order in which failures occur (see errors and concurrency).
Async.race runs two computations and returns the result of whichever finishes first, while Async.raceAll does the same for an array of computations.
The result of the call to Async.race or Async.raceAll is that of the first subcomputation to chronologically finish, whether it is a thrown exception or a returned value.
A computation that fails quickly takes precedence over one that succeeds slowly.
None of these operators cancel the computations whose results are not used.
In Async.race and Async.raceAll, the computations that do not finish first continue running to completion, and their results are discarded.
In Async.concurrently and Async.concurrentlyAll, a failure in one subcomputation does not stop the others.
The corresponding operators on ContextAsync, such as ContextAsync.race, do cancel the computations that are no longer needed.
To start a computation concurrently without awaiting its result, use background.
Each operator is available for BaseAsync, EAsync, and Async.
Std.Async.Async.concurrently {α β : Type} (x : Async α) (y : Async β) (prio : Task.Priority := Task.Priority.default) : Async (α × β)Std.Async.Async.concurrently {α β : Type} (x : Async α) (y : Async β) (prio : Task.Priority := Task.Priority.default) : Async (α × β)
Runs two computations concurrently and returns both results as a pair.
Std.Async.EAsync.concurrently {ε α β : Type} (x : EAsync ε α) (y : EAsync ε β) (prio : Task.Priority := Task.Priority.default) : EAsync ε (α × β)Std.Async.EAsync.concurrently {ε α β : Type} (x : EAsync ε α) (y : EAsync ε β) (prio : Task.Priority := Task.Priority.default) : EAsync ε (α × β)
Runs two computations concurrently and returns both results as a pair.
Std.Async.BaseAsync.concurrently {α β : Type} (x : BaseAsync α) (y : BaseAsync β) (prio : Task.Priority := Task.Priority.default) : BaseAsync (α × β)Std.Async.BaseAsync.concurrently {α β : Type} (x : BaseAsync α) (y : BaseAsync β) (prio : Task.Priority := Task.Priority.default) : BaseAsync (α × β)
Runs two computations concurrently and returns both results as a pair.
Std.Async.Async.concurrentlyAll {α : Type} (xs : Array (Async α)) (prio : Task.Priority := Task.Priority.default) : Async (Array α)Std.Async.Async.concurrentlyAll {α : Type} (xs : Array (Async α)) (prio : Task.Priority := Task.Priority.default) : Async (Array α)
Runs all computations in an Array concurrently and returns all results as an array.
Std.Async.EAsync.concurrentlyAll {ε α : Type} (xs : Array (EAsync ε α)) (prio : Task.Priority := Task.Priority.default) : EAsync ε (Array α)Std.Async.EAsync.concurrentlyAll {ε α : Type} (xs : Array (EAsync ε α)) (prio : Task.Priority := Task.Priority.default) : EAsync ε (Array α)
Runs all computations in an Array concurrently and returns all results as an array.
Std.Async.BaseAsync.concurrentlyAll {α : Type} (xs : Array (BaseAsync α)) (prio : Task.Priority := Task.Priority.default) : BaseAsync (Array α)Std.Async.BaseAsync.concurrentlyAll {α : Type} (xs : Array (BaseAsync α)) (prio : Task.Priority := Task.Priority.default) : BaseAsync (Array α)
Runs all computations in an Array concurrently and returns all results as an array.
Std.Async.Async.race {α : Type} [Inhabited α] (x y : Async α) (prio : Task.Priority := Task.Priority.default) : Async αStd.Async.Async.race {α : Type} [Inhabited α] (x y : Async α) (prio : Task.Priority := Task.Priority.default) : Async α
Runs two computations concurrently and returns the result of the one that finishes first. The other result is lost and the other task is not cancelled, so the task will continue the execution until the end.
Std.Async.EAsync.race {α ε : Type} [Inhabited α] (x y : EAsync ε α) (prio : Task.Priority := Task.Priority.default) : EAsync ε αStd.Async.EAsync.race {α ε : Type} [Inhabited α] (x y : EAsync ε α) (prio : Task.Priority := Task.Priority.default) : EAsync ε α
Runs two computations concurrently and returns the result of the one that finishes first. The other result is lost and the other task is not cancelled, so the task will continue the execution until the end.
Std.Async.BaseAsync.race {α : Type} [Inhabited α] (x y : BaseAsync α) (prio : Task.Priority := Task.Priority.default) : BaseAsync αStd.Async.BaseAsync.race {α : Type} [Inhabited α] (x y : BaseAsync α) (prio : Task.Priority := Task.Priority.default) : BaseAsync α
Runs two computations concurrently and returns the result of the one that finishes first. The other result is lost and the other task is not cancelled, so the task will continue the execution until the end.
Std.Async.Async.raceAll.{u_1} {c : Type u_1} {α : Type} [ForM Async c (Async α)] (xs : c) (prio : Task.Priority := Task.Priority.default) : Async αStd.Async.Async.raceAll.{u_1} {c : Type u_1} {α : Type} [ForM Async c (Async α)] (xs : c) (prio : Task.Priority := Task.Priority.default) : Async α
Runs all computations concurrently and returns the result of the first one to finish. All other results are lost, and the tasks are not cancelled, so they'll continue their execution until the end.
Std.Async.EAsync.raceAll.{u_1} {α ε : Type} {c : Type u_1} [Inhabited α] [ForM (EAsync ε) c (EAsync ε α)] (xs : c) (prio : Task.Priority := Task.Priority.default) : EAsync ε αStd.Async.EAsync.raceAll.{u_1} {α ε : Type} {c : Type u_1} [Inhabited α] [ForM (EAsync ε) c (EAsync ε α)] (xs : c) (prio : Task.Priority := Task.Priority.default) : EAsync ε α
Runs all computations concurrently and returns the result of the first one to finish. All other results are lost, and the tasks are not cancelled, so they'll continue their execution until the end.
Std.Async.BaseAsync.raceAll.{u_1} {α : Type} {c : Type u_1} [Inhabited α] [ForM BaseAsync c (BaseAsync α)] (xs : c) (prio : Task.Priority := Task.Priority.default) : BaseAsync αStd.Async.BaseAsync.raceAll.{u_1} {α : Type} {c : Type u_1} [Inhabited α] [ForM BaseAsync c (BaseAsync α)] (xs : c) (prio : Task.Priority := Task.Priority.default) : BaseAsync α
Runs all computations concurrently and returns the result of the first one to finish. All other results are lost, and the tasks are not cancelled, so they'll continue their execution until the end.
21.12.3. Event Selection
Event selection involves both selectors, which are the source of events, and selectables, which pair selectors with code to be executed when the selector's event occurs. When a selector's event occurs, the selector has resolved. A selectable's code is not executed immediately when its selector resolves; instead, it is run when invoked by event selection. When a selectable whose selector has resolved is chosen for execution, it is selected.
A Selector α provides a value of type α when its event occurs, while a Selectable α contains an Async action to run when its selector has resolved.
The type of the selector in a Selectable is a field of the constructor Selectable.case, rather than a parameter to the type; this means that selectables that are waiting on different types of event data can be used together.
An event source that can be multiplexed using Selectable.one, see the documentation of
Selectable.one for how the protocol of communicating with a Selector works.
Constructor
Std.Async.Selector.mk
Fields
tryFn : Async (Option α)
registerFn : Waiter α → Async Unit
Registers a Waiter with the event source. Once data is available, the event source should
attempt to call Waiter.race and resolve the Waiter's promise if it wins. It is crucial that
data is never actually consumed from the event source unless Waiter.race wins in order to
prevent data loss.
unregisterFn : Async Unit
A cleanup function that is called once any Selector has won the Selectable.one race.
An event source together with a continuation to call on data obtained from that event source,
usually used together in conjunction with Selectable.one.
Constructor
Std.Async.Selectable.case
Fields
β : Type
selector : Selector self.β
The event source.
cont : self.β → Async α
The continuation that is called on results from the event source.
Event selection is invoked using three operators:
-
Selectable.oneblocks until one selectable's event occurs and returns the resulting value, -
Selectable.tryOnechecks whether any selectable is resolved and returns the associated value but does not block, -
Selectable.combinecreates a newSelectorwhose event occurs when any of the underlyingSelectables selector's event occurs, yielding theSelectable's data.
Performs fair and data-loss free multiplexing on the Selectables in selectables.
The protocol for this is as follows:
-
The
selectablesare shuffled randomly. -
Run
Selector.tryFnfor each element inselectables. If any succeed, the correspondingSelectable.contis executed and its result is returned immediately. -
If none succeed, a
Waiteris registered with eachSelectorusingSelector.registerFn. Once one of them resolves theWaiter, allSelector.unregisterFnfunctions are called, and theSelectable.contof the winningSelectoris executed and returned.
Performs fair and data-loss free non-blocking multiplexing on the Selectables in selectables.
This function only tries the non-blocking tryFn for each Selectable without registering
waiters or blocking. It returns some result if any Selectable is immediately available,
or none if all would block.
The protocol for this is as follows:
-
The
selectablesare shuffled randomly for fairness. -
Run
Selector.tryFnfor each element inselectables. If any succeed, the correspondingSelectable.contis executed and its result is returned assome result. -
If none succeed,
noneis returned immediately without blocking.
Creates a Selector that performs fair and data-loss free multiplexing on multiple Selectables.
This allows the multiplexing operation to be composed with other selectors.
Polling Without Blocking
Selectable.tryOne checks whether any selector has already resolved and returns the corresponding value immediately, or none if none has, rather than blocking.
Defining selection with := rather than ← makes pick the Async computation itself rather than its result, so the same poll can be run more than once.
#eval show IO (Option String × Option String × Option String) from do
let colors ← Channel.new (α := String)
let flavors ← Channel.new (α := String)
let pick := Selectable.tryOne #[
.case colors.recvSelector fun color => return color,
.case flavors.recvSelector fun flavor => return flavor
]
let whenEmpty ← pick.block
discard <| colors.send "gray"
let afterColor ← pick.block
discard <| flavors.send "salty"
let afterFlavor ← pick.block
return (whenEmpty, afterColor, afterFlavor)
Selection and Timeouts
A CloseableChannel provides a selector via CloseableChannel.recvSelector that resolves when the channel receives a value.
Selector.sleep is a selector that resolves after the specified number of milliseconds have passed.
The function recv combines these, waiting for up to 100 milliseconds to receive a value, after which it terminates without one:
def recv (ch : CloseableChannel Nat) : Async (Option Nat) := do
Selectable.one #[
.case ch.recvSelector fun n? => return n?,
.case (← Selector.sleep 100) fun () => return none
]
If the channel contains a value, then the recvSelector wins:
#eval show IO _ from do
let ch ← CloseableChannel.new (α := Nat)
discard <| ch.send 42
(recv ch).block
If not, the timer wins:
#eval show IO _ from do
let ch ← CloseableChannel.new (α := Nat)
-- nothing sent: the timeout wins
(recv ch).block
Selection
A CloseableChannel provides a selector via CloseableChannel.recvSelector that resolves when the channel receives a value.
The function recv2 selects the first value returned on either channel:
def recv2 (ch1 ch2 : CloseableChannel Nat) : Async (Option Nat) := do
Selectable.one #[
.case ch1.recvSelector fun n? => return n?,
.case ch2.recvSelector fun n? => return n?
]
If only one channel contains a value, then it is returned:
#eval show IO _ from do
let ch1 ← CloseableChannel.new (α := Nat)
let ch2 ← CloseableChannel.new (α := Nat)
discard <| ch1.send 1
(recv2 ch1 ch2).block
#eval show IO _ from do
let ch1 ← CloseableChannel.new (α := Nat)
let ch2 ← CloseableChannel.new (α := Nat)
discard <| ch2.send 2
(recv2 ch1 ch2).block
If neither channel contains a value, then recv2 blocks until one does; the first one to have a value wins:
#eval show IO _ from do
let ch1 ← CloseableChannel.new (α := Nat)
let ch2 ← CloseableChannel.new (α := Nat)
discard <| IO.asTask (prio := .dedicated) do
IO.sleep 100
ch1.send 1
discard <| IO.asTask (prio := .dedicated) do
IO.sleep 50
ch2.send 2
(recv2 ch1 ch2).block
Selectable.one throws an exception when passed an empty array of selectables, because it's impossible to get a value from nothing.
Selectable.tryOne always returns none when passed an empty array.
Event selection is fair. This means that there is an equal probability that any of the selectables with currently-resolved selectors have an equal chance of winning and having their associated code invoked. This is important because a bias in event selection can lead to one of the selectables never being called, which can in turn cause data to accumulate without bound in the source it would have handled. Behind the scenes, fairness is ensured by randomizing the order of selectables each time.
Furthermore, event selection never results in data being lost in the losing selectables. The implementation ensures that data is never removed from a selector without being passed to the selectable's code, and that resolving a selector calls the associated selectable's code at most once. Data loss and double delivery are ruled out via a protocol that distinguishes checking whether a selector is resolved from actually consuming its data along with an atomic means of selecting one of the resolved selectors.
21.12.3.1. Selection Protocol
This section is primarily intended for authors of new selectors.
Event selection begins by randomizing the order of the selectables.
It consults each selector's non-blocking poll Selector.tryFn until one of them returns some.
This is the winning selectable; its code is invoked and no further work is needed.
On this fast path, only one selector is ever consumed, so there is no risk of data loss or double delivery.
If no selector was resolved in the first iteration (that is, each tryFn returned none), then it is necessary to wait until one of the selectors is resolved.
Waiting consists of first registering a waiter with each selector; the first selector that has data wins the race via the waiters.
The winning selector consumes its event, invokes code to clean up the other waiting selectors, computes the selectable's value, and resolves an overall promise that Selectable.one is blocked on.
More specifically, this is done by creating an atomic flag (indicating that a winner has been selected) and a promise for the result of Selectable.one.
A registration loop processes each selectable in the array:
-
The system checks whether the flag is now set, indicating that a prior selector has won the race. If so, the loop terminates.
-
A waiter is registered with the selector using
Selector.registerFn. This registration process may not consume data; it merely registers interest in data should it become available. The waiter includes a reference to the atomic flag along with a promise that can be resolved with the selector's data. The selector must callraceon the waiter when the event has occurred, but it may only consume data if it wins the race. -
A task is created that observes the waiter's promise. When the promise is resolved, indicating that it has won the race, this completion callback is invoked with
noneif the promise was dropped (e.g. due to cancellation or unregistering); in this case, it should do nothing. If it is invoked withsomearound the result, then it must run anAsynccomputation that: a. propagates any error indicated by the data source's result, b. blocks until the entire registration loop is complete, c. unregisters the waiter from every selectable in the array using itsSelector.unregisterFn, and d. runs the winning selectable's code, resolving the result promise.
When the registration loop is complete, an internal promise is resolved that unblocks the winning waiter's callback. This block ensures that all registration occurs before all cleanup.
Finally, Selectable.one awaits the overall result promise, which will be resolved as soon as there is a winning callback.
21.12.3.1.1. Waiters
A waiter is a means of atomically selecting a single offered value.
Internally, it contains an atomic flag that indicates that a winner has been selected.
When a client has a value, it calls Waiter.race with two callbacks: one is used when the offered value was not accepted (it did not win the race), the other is used when it is accepted.
The callback that wins the race should resolve the waiter's promise, which is provided to the winning callback.
This two-phase protocol ensures that there is no data loss, because selectors only consume events once they've already won the race.
The core data structure for racing on winning a Selectable.one if multiple event sources are ready
at the same time. A Task can try to finish the waiter by calling Waiter.race.
Fields
promise : IO.Promise (Except IO.Error α)
Std.Async.Waiter.race {m : Type → Type} {α β : Type} [Monad m] [MonadLiftT (ST IO.RealWorld) m] (w : Waiter α) (lose : m β) (win : IO.Promise (Except IO.Error α) → m β) : m βStd.Async.Waiter.race {m : Type → Type} {α β : Type} [Monad m] [MonadLiftT (ST IO.RealWorld) m] (w : Waiter α) (lose : m β) (win : IO.Promise (Except IO.Error α) → m β) : m β
Try to atomically finish the Waiter. If the race for finishing it is won, win is executed
with the internal IO.Promise of the Waiter. This promise must under all circumstances be
resolved by win. If the race is lost some cleanup work can be done in lose.
Std.Async.Waiter.withPromise {α β : Type} (w : Waiter α) (p : IO.Promise (Except IO.Error β)) : Waiter βStd.Async.Waiter.withPromise {α β : Type} (w : Waiter α) (p : IO.Promise (Except IO.Error β)) : Waiter β
Swap out the IO.Promise within the Waiter. Note that the part which determines whether the
Waiter is finished is not swapped out.
Std.Async.Waiter.checkFinished {m : Type → Type} {α : Type} [Monad m] [MonadLiftT (ST IO.RealWorld) m] (w : Waiter α) : m BoolStd.Async.Waiter.checkFinished {m : Type → Type} {α : Type} [Monad m] [MonadLiftT (ST IO.RealWorld) m] (w : Waiter α) : m Bool
Atomically checks whether the Waiter has already finished. Note that right after this function
call ends this might have already changed.
Natural Number Ticker
A natTicker is a selector that makes a Nat available every 100 milliseconds, incrementing each time.
Its state is determined by two values:
-
a counter, which is an
IO.Refthat contains the nextNatto emit -
the time at which the process was started
The Selector.tryFn checks whether at least 100ms have elapsed for each emitted Nat.
If so, the value is incremented and returned immediately:
def tickerTryFn (counter : IO.Ref Nat) (startMs : Nat) := do
let nowMs ← IO.monoMsNow
let n ← counter.get
if nowMs ≥ startMs + n * 100 then
counter.set (n + 1)
return (some n)
else
return none
If the race was not immediately run, a waiter is registered.
After sleeping until the next Nat is ready, the waiter's race is invoked; if the race is won, then the counter is incremented:
def tickerRegisterFn (counter : IO.Ref Nat) (startMs : Nat)
(waiter : Waiter Nat) : Async Unit := do
let n ← counter.get
let delay := startMs + n * 100 - (← IO.monoMsNow)
let sleep ← Sleep.mk <| .ofNat delay
sleep.wait
waiter.race (pure ()) fun promise => do
counter.set (n + 1)
promise.resolve (.ok n)
These components can be combined into a selector:
def natTicker : IO (Selector Nat) := do
let current ← IO.mkRef 0
let startMs ← IO.monoMsNow
return {
tryFn := tickerTryFn current startMs
registerFn := tickerRegisterFn current startMs
unregisterFn := pure ()
}
This selector is not thread-safe.
Multiple uses in a single Selectable.one are safe, because they do not lose data (the set is only invoked when the race has been definitively won).
However, concurrent invocations of Selectable.one on the same natTicker can lead to data races.
Fixing this requires careful locking.
21.12.4. Standard Selectors
The standard library includes a number of selectors for events such as timers, receiving values through channels, and network sockets.
These selectors allow Async programs to reliably process inputs from many different sources.
When a selector is built on some data source, it is very important not to use the same data source directly.
For example, recvSelector and recv should not be used on the same channel.
This can lead to violations of the selector protocol when the selector relies on exclusive control over the real-world state of the data source.
Create a Selector that resolves once s has finished. s only starts when it runs inside of a Selectable.
Std.Broadcast.Receiver.recvSelector {α : Type} [Inhabited α] (ch : Std.Broadcast.Receiver α) : Selector (Option α)Std.Broadcast.Receiver.recvSelector {α : Type} [Inhabited α] (ch : Std.Broadcast.Receiver α) : Selector (Option α)
Creates a Selector that resolves once the broadcast channel ch has data available and provides that data.
Creates a selector that waits for notifications
Creates a selector that waits for cancellation.
Creates a selector that waits for cancellation.
Returns a selector that completes when the current context is cancelled. This is useful for selecting on cancellation alongside other asynchronous operations.
Creates a Selector that resolves once s has a connection available. Calling this function
does not start the connection wait, so it must not be called in parallel with accept.
Std.Async.TCP.Socket.Client.recvSelector (s : TCP.Socket.Client) (size : UInt64) : Selector (Option ByteArray)Std.Async.TCP.Socket.Client.recvSelector (s : TCP.Socket.Client) (size : UInt64) : Selector (Option ByteArray)
Creates a Selector that resolves once s has data available, up to at most size bytes,
and provides that data. Calling this function does not starts the data wait, so it must not be called
in parallel with recv?.
Creates a Selector that resolves once s has data available, up to at most size bytes,
and provides that data. If the socket has not been previously bound with bind, it is
automatically bound to 0.0.0.0 (all interfaces) with a random port.
Calling this function does starts the data wait, only when it's used with Selectable.one or combine.
It must not be called in parallel with recv.
Create a Selector that resolves once s has received the signal. Note that calling this function
does not start the signal waiter.
21.12.5. Errors
Error handling in Async mirrors error handling in IO:
-
EAsync, likeEIO, is parameterized by an error type. Behind the scenes,EAsync ε αisBaseAsync (Except ε α), and itsMonadinstance is like that ofExceptT. -
Async αisEAsync IO.Error α, just asIO αisEIO IO.Error α.
The details of error handling in Async are consequences of this arrangement.
When an asynchronous task (spawned via async) throws an exception, this is not observable in the parent.
The error surfaces when the task's result is requested via await.
If the task is never awaited, the error vanishes.
In other words, errors in tasks created via background or ContextAsync.disown are not propagated at all.
21.12.5.1. Errors and Concurrency
The concurrency operators Async.concurrently and Async.concurrentlyAll await the results of their sub-tasks positionally rather than chronologically.
This means that errors that result from these tasks are reported in source-code order, rather than the chronological order in which the errors occurred.
Concurrency and Error Propagation
failFast waits 5 milliseconds before throwing an exception, while failSlow waits 250 milliseconds:
def failFast : Async Nat := do
sleep 5
throw <| .userError "Fast failure"
def failSlow : Async Nat := do
sleep 250
throw <| .userError "Slow failure"
When run via Async.concurrently, the program fails with the error from failSlow. Even though it was chronologically produced after the failure from failFast, the result of failSlow was awaited first.
#eval Async.block do
let val ← Async.concurrently (prio := .dedicated) failSlow failFast
pure ()
Async.race and Async.raceAll return the result of the first completed task, whether it is a success or a failure.
This means that a quickly-produced error takes precedence over a slowly-produced success.
21.12.5.2. Errors in Event Selection
During selection, errors might occur at any stage of the protocol.
Errors thrown by a selector during the initial tryFn loop terminate the selection immediately.
An error thrown from a registerFn or unregisterFn, by contrast, can leave selectors that were already registered without a matching call to unregisterFn.
A selector that wins the race may resolve the promise with either Except.ok or Except.error; in the latter case, the result of the call to Selectable.one is itself an error.
21.12.6. Timers
Sleep can be used to sleep for some duration once.
The underlying timer has millisecond resolution.
Fields
native : Std.Internal.UV.Timer
Set up a Sleep that waits for duration milliseconds.
This function only initializes but does not yet start the timer.
If:
-
sis still running the timer restarts counting from now and completes afterdurationmilliseconds. -
sis not yet or not anymore running this is a no-op.
Interval can be used to repeatedly wait for some duration like a clock.
The underlying timer has millisecond resolution.
Fields
native : Std.Internal.UV.Timer
Setup up an Interval that waits for duration milliseconds.
This function only initializes but does not yet start the timer.
If:
-
iis not yet running start it and return anAsynccomputation that completes right away as the 0th multiple ofdurationhas elapsed. -
iis already running and: -
iis not running anymore this is a no-op.
If:
-
Interval.tickwas called onibefore the timer restarts counting from now and the next tick happens induration. -
iis not yet or not anymore running this is a no-op.
Sleep.stop/Interval.stop leave pending waits hanging forever, and Selector.sleep's timer only starts once it's used inside a Selectable.
Selectors and Timers
This program runs a loop.
At each iteration, it waits up to two seconds for a line of input.
If the input is provided, then it echoes it and loops again.
If the iteration times out, then the program exits.
Checking for the timeout is done by using Selectable.one to race the timer against a channel that delivers the lines of input.
This channel can be selected against, and it is fed by a dedicated thread that reads stdin.
module
import Std.Async
import Std.Sync.Channel
open Std.Async
open Std (CloseableChannel)
-- Blocking reader on a dedicated thread: forward each line, close on EOF.
partial def reader (stdin : IO.FS.Stream) (ch : CloseableChannel String) : IO Unit := do
let line ← stdin.getLine
if line.isEmpty then
discard <| (ch.close).toBaseIO
else
discard <| ch.send line
reader stdin ch
-- Echo each line; stop on EOF (channel closed) or 2s of silence.
partial def echo (ch : CloseableChannel String) : Async Unit := do
let more ← Selectable.one #[
.case ch.recvSelector fun
| some line => do IO.print (s!"got: {line}"); return true
| none => do IO.println "done"; return false,
.case (← Selector.sleep 2000) fun _ => do
IO.println "done"
return false
]
if more then echo ch
public def main : IO Unit := do
let ch ← CloseableChannel.new (α := String)
discard <| IO.asTask (prio := .dedicated) (reader (← IO.getStdin) ch)
(echo ch).block
When run with this input:
stdinOne lineAnotherit produces this output:
stdoutgot: One linegot: Anotherdone21.12.7. Asynchronous I/O
Interface for asynchronous reading operations.
Instance Constructor
Std.Async.IO.AsyncRead.mk
Methods
read : α → Async β
Interface for asynchronous streaming with selector-based iteration.
Instance Constructor
Std.Async.IO.AsyncStream.mk
Methods
next : α → Selector β
stop : α → IO Unit
21.12.7.1. Network
The standard library provides asynchronous TCP and UDP sockets along with DNS name resolution.
Operations that wait for the network, such as accepting a connection, receiving data, or resolving a name, are Async actions.
TCP and UDP sockets additionally provide selectors, namely TCP.Socket.Server.acceptSelector, TCP.Socket.Client.recvSelector, and UDP.Socket.recvSelector, so that network events can be multiplexed with other I/O using event selection.
Socket addresses are represented by the types Std.Net.SocketAddress and Std.Net.IPAddr.
As with other selectors, a socket's selector and its corresponding blocking operation each expect exclusive control of the socket.
They must not be used at the same time on the same socket.
21.12.7.1.1. TCP
TCP is connection-oriented: a client establishes a connection to a server, after which the two exchange a reliable, ordered stream of bytes. The protocol includes measures for ensuring that the data that is sent actually arrives, including re-transmission of missing parts; these features rely on having an established connection with its associated state. A TCP server socket accepts incoming connections, while a TCP client socket connects to a server and exchanges data. A server is set up by creating it, binding it to an address, listening, and then accepting connections. A client is created, connected to an address, and then used to send and receive data.
Represents a TCP server socket, managing incoming client connections.
Fields
native : Std.Internal.UV.TCP.Socket
Creates a new TCP server socket.
Binds the server socket to the specified address. Address reuse is enabled to allow rebinding the same address.
Listens for incoming connections with the given backlog.
Accepts an incoming connection.
Tries to accept an incoming connection.
Gets the local address of the server socket.
Enables the Nagle algorithm for all client sockets accepted by this server socket.
Std.Async.TCP.Socket.Server.keepAlive (s : TCP.Socket.Server) (enable : Bool) (delay : Std.Time.Second.Offset) : autoParam (delay.val ≥ 1) TCP.Socket.Server.keepAlive._auto_1 → IO UnitStd.Async.TCP.Socket.Server.keepAlive (s : TCP.Socket.Server) (enable : Bool) (delay : Std.Time.Second.Offset) : autoParam (delay.val ≥ 1) TCP.Socket.Server.keepAlive._auto_1 → IO Unit
Enables TCP keep-alive for all client sockets accepted by this server socket.
Represents a TCP client socket, used to connect to a server.
Fields
native : Std.Internal.UV.TCP.Socket
Creates a new TCP client socket.
Binds the server socket to the specified address. Address reuse is enabled to allow rebinding the same address.
Std.Async.TCP.Socket.Client.connect (s : TCP.Socket.Client) (addr : Std.Net.SocketAddress) : Async UnitStd.Async.TCP.Socket.Client.connect (s : TCP.Socket.Client) (addr : Std.Net.SocketAddress) : Async Unit
Connects the client socket to the given address.
Sends data through the client socket.
Sends multiple data buffers through the client socket.
Std.Async.TCP.Socket.Client.recv? (s : TCP.Socket.Client) (size : UInt64) : Async (Option ByteArray)Std.Async.TCP.Socket.Client.recv? (s : TCP.Socket.Client) (size : UInt64) : Async (Option ByteArray)
Receives data from the client socket. If data is received, it’s wrapped in .some. If EOF is reached,
the result is .none, indicating no more data is available. Receiving data in parallel on the same
socket is not supported. Instead, we recommend binding multiple sockets to the same address.
Furthermore calling this function in parallel with recvSelector is not supported.
Shuts down the write side of the client socket.
Gets the remote address of the client socket.
Gets the local address of the client socket.
Enables the Nagle algorithm for the client socket.
Std.Async.TCP.Socket.Client.keepAlive (s : TCP.Socket.Client) (enable : Bool) (delay : Std.Time.Second.Offset) : autoParam (delay.val ≥ 0) TCP.Socket.Client.keepAlive._auto_1 → IO UnitStd.Async.TCP.Socket.Client.keepAlive (s : TCP.Socket.Client) (enable : Bool) (delay : Std.Time.Second.Offset) : autoParam (delay.val ≥ 0) TCP.Socket.Client.keepAlive._auto_1 → IO Unit
Enables TCP keep-alive with a specified delay for the client socket.
21.12.7.1.2. UDP
Unlike TCP, UDP is connectionless: rather than first establishing a connection, a single socket sends and receives independent messages, called datagrams, to and from any address. There is no provision for ensuring that the datagrams actually arrive; with UDP, this is an application-level concern. A datagram can also be delivered to many recipients at once using broadcast or multicast.
Creates a new UDP socket.
Binds the UDP socket to the given address. Address reuse is enabled to allow rebinding the same address.
Associates the UDP socket with the given address and port, so every message sent by this socket is automatically sent to that destination.
Sends data through an UDP socket. The addr parameter specifies the destination address. If addr
is none, the data is sent to the default peer address set by connect.
Sends multiple data buffers through an UDP socket. The addr parameter specifies the destination
address. If addr is none, the data is sent to the default peer address set by connect.
Receives data from an UDP socket. size is for the maximum bytes to receive.
The promise resolves when some data is available or an error occurs. If the socket
has not been previously bound with bind, it is automatically bound to 0.0.0.0
(all interfaces) with a random port.
Furthermore calling this function in parallel with recvSelector is not supported.
Gets the local address of the UDP socket.
Gets the remote address of the UDP socket. On unconnected handles, it throws the .invalidArgument.
error.
Enables or disables broadcasting for the UDP socket.
Sets the TTL for outgoing packets.
Enables or disables multicast loopback for the UDP socket.
Sets the time-to-live (TTL) for multicast packets.
Std.Async.UDP.Socket.setMulticastInterface (s : UDP.Socket) (interfaceAddr : Std.Net.IPAddr) : IO UnitStd.Async.UDP.Socket.setMulticastInterface (s : UDP.Socket) (interfaceAddr : Std.Net.IPAddr) : IO Unit
Sets the multicast interface for sending packets.
Std.Async.UDP.Socket.setMembership (s : UDP.Socket) (multicastAddr : Std.Net.IPAddr) (interfaceAddr : Option Std.Net.IPAddr) (membership : UDP.Membership) : IO UnitStd.Async.UDP.Socket.setMembership (s : UDP.Socket) (multicastAddr : Std.Net.IPAddr) (interfaceAddr : Option Std.Net.IPAddr) (membership : UDP.Membership) : IO Unit
Sets the membership for joining or leaving a multicast group.
Membership type for multicast operations.
Constructors
21.12.7.1.3. DNS
DNS resolution converts between names and socket addresses.
DNS.getAddrInfo performs forward resolution from a host and service to a list of addresses, while DNS.getNameInfo performs reverse resolution from an address to a host and service.
Asynchronously resolves a hostname and service to an array of socket addresses.
Performs a reverse DNS lookup on a SocketAddress.
Represents a resolved hostname and service name from a socket address.
Constructor
Std.Async.DNS.NameInfo.mk
Fields
host : String
The resolved hostname (e.g., "example.com").
service : String
The service name (e.g., "http" for port 80).
21.12.7.2. Signals
Unix-style signals are asynchronous notifications that can be received from the operating system at any time.
For example, when a user presses Ctrl-C, the SIGINT signal is sent to the process.
A Signal.Waiter is a Lean representation of an underlying signal handler.
The signals that can be handled are enumerated in the type Signal:
Unix style signals for Unix and Windows. SIGKILL and SIGSTOP are missing because they cannot be caught. SIGBUS, SIGFPE, SIGILL, and SIGSEGV are missing because they cannot be caught safely by libuv. SIGPIPE is not present because the runtime ignores the signal.
Constructors
Std.Async.Signal.sighup : Signal
Hangup detected on controlling terminal or death of controlling process.
On Windows:
-
SIGHUP is generated when the user closes the console window. The program is given ~10 seconds to perform cleanup before Windows unconditionally terminates it.
Std.Async.Signal.sigint : Signal
Interrupt program.
Notes:
-
Normally delivered when the user presses CTRL+C. Not generated when terminal raw mode is enabled.
Std.Async.Signal.sigquit : Signal
Quit program.
Std.Async.Signal.sigtrap : Signal
Trace/breakpoint trap.
Std.Async.Signal.sigabrt : Signal
Abort signal.
Notes:
-
SIGABRT is not catchable if generated by certain runtime functions, such as abort or assertion failure.
-
On Windows, watchers can be created for SIGABRT, but they never receive the signal.
Std.Async.Signal.sigusr1 : Signal
User-defined signal 1.
Std.Async.Signal.sigusr2 : Signal
User-defined signal 2.
Std.Async.Signal.sigalrm : Signal
Real-time timer expired.
Std.Async.Signal.sigterm : Signal
Termination signal.
Notes:
-
On Windows, watchers can be created for SIGTERM, but they never receive the signal.
Std.Async.Signal.sigchld : Signal
Child status has changed.
Std.Async.Signal.sigcont : Signal
Continue after stop.
Std.Async.Signal.sigtstp : Signal
Stop typed at terminal.
Std.Async.Signal.sigttin : Signal
Background read attempted from control terminal.
Std.Async.Signal.sigttou : Signal
Background write attempted to control terminal.
Std.Async.Signal.sigurg : Signal
Urgent condition on socket.
Std.Async.Signal.sigxcpu : Signal
CPU time limit exceeded.
Std.Async.Signal.sigxfsz : Signal
File size limit exceeded.
Std.Async.Signal.sigvtalrm : Signal
Virtual alarm clock.
Std.Async.Signal.sigprof : Signal
Profiling timer expired.
Std.Async.Signal.sigwinch : Signal
Window size change.
Notes:
-
SIGWINCH is raised whenever the runtime detects the console has been resized.
-
Under console emulators, or on 32-bit apps on 64-bit systems, SIGWINCH is emulated.
-
In these cases, signals may not be delivered timely.
Std.Async.Signal.sigio : Signal
I/O now possible.
Std.Async.Signal.sigsys : Signal
Bad system call.
Depending on the platform, some signals cannot be caught.
On Unix-like operating systems, SIGKILL and SIGSTOP can't be caught.
SIGBUS, SIGFPE, SIGILL, or SIGSEGV can't be handled because Lean uses libuv to install signal handlers, and libuv cannot safely catch these signals.
Finally, the Lean run-time system ignores SIGPIPE.
On Windows, waiters can be created for SIGTERM and SIGABRT, but they never fire. SIGHUP fires when the console is closed, with approximately ten seconds provided for cleanup. SIGINT is not delivered in terminal raw mode, and SIGWINCH is emulated and may be untimely.
To install a signal handler, use Signal.Waiter.mk to register a signal itself.
The waiter can be used via Signal.Waiter.wait, which allows it to be waited for using await, but most use cases probably want to use Signal.Waiter.selector together with event selection to handle arriving signals by canceling ongoing work and cleaning up.
This pattern, and the Signal.Waiter API, mirror those of timers; unlike timers, the arrival of a signal is unpredictable.
Set up a Signal.Waiter that waits for the specified signum.
This function only initializes but does not yet start listening for the signal.
Create a Selector that resolves once s has received the signal. Note that calling this function
does not start the signal waiter.
Selectors and Signals
This program runs a loop.
At each iteration, it waits for a line of input or Ctrl-C, which sends SIGINT.
If the input is provided, then it echoes it and loops again.
If it receives SIGINT, then iteration stops and the program terminates.
Checking for the signal is done by using Selectable.one to race the signal handler against a channel that delivers the lines of input.
This channel can be selected against, and it is fed by a dedicated thread that reads stdin.
module
import Std.Async
import Std.Sync.Channel
open Std.Async
open Std (CloseableChannel)
-- Blocking reader on a dedicated thread: forward each line, close on EOF.
partial def reader (stdin : IO.FS.Stream) (ch : CloseableChannel String) : IO Unit := do
let line ← stdin.getLine
if line.isEmpty then
discard <| (ch.close).toBaseIO
else
discard <| ch.send line
reader stdin ch
-- Echo each line; stop on EOF (channel closed) or SIGINT (Ctrl-C).
partial def echo (sigint : Signal.Waiter) (ch : CloseableChannel String) : Async Unit := do
let more ← Selectable.one #[
.case ch.recvSelector fun
| some line => do IO.print (s!"got: {line}"); return true
| none => do IO.println "done"; return false,
.case sigint.selector fun _ => do
IO.println "interrupted"
return false
]
if more then echo sigint ch
public def main : IO Unit := do
let ch ← CloseableChannel.new (α := String)
let sigint ← Signal.Waiter.mk .sigint (repeating := true)
discard <| IO.asTask (prio := .dedicated) (reader (← IO.getStdin) ch)
(echo sigint ch).block
21.12.8. Cancellation
Typical asynchronous applications need to handle cancellation, where work needs to be abandoned.
For example, if a user presses Ctrl-C or a timeout occurs, then a download may be abandoned and temporary files cleaned up without terminating the entire application.
The ContextAsync monad provides tools for managing hierarchical trees of tasks, where canceling a task also cancels its children.
Cancellation is cooperative: tasks must explicitly check whether they've been canceled and terminate themselves. In other words, cancellation is an event that tasks may opt into observing, rather than a mechanism to forcibly terminate other tasks.
There are two primary ways to cancel a tree of ContextAsync computations:
-
ContextAsync.runexecutes a cancellable tree of tasks as an ordinaryAsynctask. When the root task is completed, the entire tree is canceled. -
ContextAsync.cancelcancels the current task and all of its children.
For cancellation to work as expected, concurrent tasks should be started with the helpers that are specifically designed for ContextAsync.
When this is not possible, use ContextAsync.runIn to associate the current cancellation context with the new computation.
An asynchronous computation with cooperative cancellation support via a CancellationContext. ContextAsync α
is equivalent to ReaderT CancellationContext Async α, providing a CancellationContext value to async
computations.
Cancels the current context with the given reason, cascading to all child contexts.
Cancellation is cooperative, operations must explicitly check isCancelled or use awaitCancellation to respond.
Runs a ContextAsync computation with a new context that cancels after the execution of the computation.
See also ContextAsync.runIn for running with an existing context.
Std.Async.ContextAsync.runIn {α : Type} (ctx : Std.CancellationContext) (x : ContextAsync α) : Async αStd.Async.ContextAsync.runIn {α : Type} (ctx : Std.CancellationContext) (x : ContextAsync α) : Async α
Runs a ContextAsync computation with a given context. See also ContextAsync.run for running with a new
context that automatically cancels after execution.
Std.Async.ContextAsync.background {α : Type} (action : ContextAsync α) (prio : Task.Priority := Task.Priority.default) : ContextAsync UnitStd.Async.ContextAsync.background {α : Type} (action : ContextAsync α) (prio : Task.Priority := Task.Priority.default) : ContextAsync Unit
Launches a ContextAsync computation in the background, discarding its result.
The computation runs independently in the background in its own child context. The parent computation does not wait
for background tasks to complete. This means that if the parent finishes its execution it will cause
the cancellation of the background functions. See also disown for launching tasks that continue independently
even after parent cancellation.
Std.Async.ContextAsync.disown {α : Type} (action : ContextAsync α) (prio : Task.Priority := Task.Priority.default) : ContextAsync UnitStd.Async.ContextAsync.disown {α : Type} (action : ContextAsync α) (prio : Task.Priority := Task.Priority.default) : ContextAsync Unit
Launches a ContextAsync computation in the background, discarding its result. It's similar to background,
but the child context is not automatically cancelled when the action completes. This allows the disowned
computation to continue running independently, even if the parent context is cancelled. The child context
will remain alive as long as the computation needs it. See also background for launching tasks that are
cancelled when the parent finishes.
Std.Async.ContextAsync.concurrently {α β : Type} (x : ContextAsync α) (y : ContextAsync β) (prio : Task.Priority := Task.Priority.default) : ContextAsync (α × β)Std.Async.ContextAsync.concurrently {α β : Type} (x : ContextAsync α) (y : ContextAsync β) (prio : Task.Priority := Task.Priority.default) : ContextAsync (α × β)
Runs two computations concurrently and returns both results. Each computation runs in its own child context; if either fails or is cancelled, both are cancelled immediately and the exception is propagated.
Std.Async.ContextAsync.race {α : Type} [Inhabited α] (x y : ContextAsync α) (prio : Task.Priority := Task.Priority.default) : ContextAsync αStd.Async.ContextAsync.race {α : Type} [Inhabited α] (x y : ContextAsync α) (prio : Task.Priority := Task.Priority.default) : ContextAsync α
Runs two computations concurrently and returns the result of the first to complete. Each computation runs in its own child context; when either completes, the other is cancelled immediately.
Std.Async.ContextAsync.raceAll.{u_1} {c : Type u_1} {α : Type} [ForM ContextAsync c (ContextAsync α)] (xs : c) (prio : Task.Priority := Task.Priority.default) : ContextAsync αStd.Async.ContextAsync.raceAll.{u_1} {c : Type u_1} {α : Type} [ForM ContextAsync c (ContextAsync α)] (xs : c) (prio : Task.Priority := Task.Priority.default) : ContextAsync α
Runs all computations concurrently and returns the first result. Each computation runs in its own child context; when the first completes successfully, all others are cancelled immediately.
21.12.8.1. Reacting to Cancellation
Asynchronous computations can react to cancellation via explicit polling with ContextAsync.isCancelled.
They can also block until the current context is canceled using ContextAsync.awaitCancellation; this is useful when there is no more work to be done until cancellation, but still allows for cleanup.
Finally, cancellation can be awaited together with other events using event selection with Selector.cancelled or ContextAsync.doneSelector (they are synonymous).
Checks if the current context is cancelled. Returns true if the context (or any ancestor) has been cancelled.
Long-running operations should periodically check this and exit gracefully when cancelled.
Observing Cancellation
ContextAsync.isCancelled reports whether the current context has been canceled.
Here, the context is canceled explicitly with ContextAsync.cancel:
#eval Async.block <| ContextAsync.run do
let before ← ContextAsync.isCancelled
ContextAsync.cancel .cancel
let after ← ContextAsync.isCancelled
return (before, after)
Cooperating with Cancellation
Because cancellation is cooperative, a long-running computation must check ContextAsync.isCancelled itself and stop once it has been canceled.
This worker records numbers until its context is canceled.
The cancellation here comes from the worker itself after three iterations, but in practice it would come from a timeout or a parent task; the worker's reaction is the same:
def worker : ContextAsync (Array Nat) := do
let log ← IO.mkRef (#[] : Array Nat)
for i in [0:100] do
if ← ContextAsync.isCancelled then break
log.modify (·.push i)
if i == 2 then ContextAsync.cancel .cancel
log.get
#eval Async.block <| ContextAsync.run worker
Waits for the current context to be cancelled.
Returns a selector that completes when the current context is cancelled. This is useful for selecting on cancellation alongside other asynchronous operations.
Interrupting a Wait
Cancellation can be awaited alongside other events using event selection.
Here, a computation waits for either a value on a channel or cancellation, whichever comes first.
Because the context is canceled before the selection runs, the cancellation branch wins and the result is none:
def waitOrCancel (ch : CloseableChannel Nat) : ContextAsync (Option Nat) := do
Selectable.one #[
.case ch.recvSelector (fun n? => return n?),
.case (← Selector.cancelled) (fun _ => return none)
]
#eval Async.block <| ContextAsync.run do
let ch ← CloseableChannel.new (α := Nat)
ContextAsync.cancel .cancel
waitOrCancel ch
Returns a selector that completes when the current context is cancelled.
21.12.8.2. Cancellation Contexts
ContextAsync is a reader on top of Async that provides access to a cancellation context.
This context contains an ID along with a mutex-guarded mutable state that encodes a tree of IDs, each with a cancellation token, and a source of unique ID values.
When child tasks are created, they are assigned new IDs and associated with the current task.
When tasks are canceled, the tree in the state is used to cancel their children.
A cancellation context that allows multiple consumers to wait until cancellation is requested. Forms a tree structure where cancelling a parent cancels all children.
Constructor
Std.CancellationContext.mk
Fields
state : Std.Mutex Std.CancellationContext.State
token : Std.CancellationToken
id : UInt64
Constructor
Std.CancellationContext.State.mk
Fields
tokens : Std.TreeMap UInt64 (Std.CancellationToken × Array UInt64) compare
Map of token IDs to optional tokens and their children.
id : UInt64
Next available ID
Creates a new root cancellation context.
Forks a child context from a parent. If the parent is already cancelled, returns the parent context. Otherwise, creates a new child that will be cancelled when the parent is cancelled.
Std.CancellationContext.cancel (x : Std.CancellationContext) (reason : Std.CancellationReason) : BaseIO UnitStd.CancellationContext.cancel (x : Std.CancellationContext) (reason : Std.CancellationReason) : BaseIO Unit
Cancels this context and all child contexts with the given reason.
Checks if the context is cancelled.
Returns the cancellation reason if the context is cancelled.
Waits for cancellation. Returns a task that completes when the context is cancelled.
Creates a selector that waits for cancellation.
Reasons for cancellation.
Constructors
Std.CancellationReason.deadline : Std.CancellationReason
Cancelled due to a deadline or timeout
Std.CancellationReason.shutdown : Std.CancellationReason
Cancelled due to shutdown
Std.CancellationReason.cancel : Std.CancellationReason
Explicitly cancelled
Std.CancellationReason.custom (msg : String) : Std.CancellationReason
Custom cancellation reason
21.12.8.3. Cancellation Tokens
A cancellation token is a mutex-guarded piece of shared mutable state that tracks whether the token has been canceled along with a set of consumers that have requested notification when cancellation occurs.
Behind the scenes, ContextAsync.isCancelled checks the current context to get the token for the current task's ID, then checks whether the cancellation reason is some or none.
A cancellation token is a synchronization primitive that allows multiple consumers to wait until cancellation is requested.
Constructor
Std.CancellationToken.mk
Fields
state : Std.Mutex Std.CancellationToken.State
The central state structure for a CancellationToken.
Constructor
Std.CancellationToken.State.mk
Fields
reason : Option Std.CancellationReason
The cancellation reason if cancelled, none otherwise.
consumers : Std.Queue Std.CancellationToken.Consumer
Consumers that are blocked waiting for cancellation.