comodule co
// creation of coroutines:
type routine = <"co.routine">
def create : (routine -> null) -> routine // background routine
def createFG : (routine -> null) -> routine // foreground routine
def exit : null -> null
def yield : null -> null
// channels:
type channel(T) = <"co.channel">(T)
def channelCreate : null -> channel(T)
def channelWait : channel(T) -> null
def channelEmit : channel(T) -> T -> null
def channelEmitEnd : channel(T) -> null
def channelToStream : channel(T) -> T*
def channelFromStream : T* -> channel(T)
def channelOwn : channel(T) -> null
// Control flow inversion:
def channelFrom : ((T -> null) -> null) -> channel(T)
def channelFromEH : ((T -> null) -> null) -> channel(result(rawErrorObject,T)) // since PR1403
// Simple error handling
def onError : (string -> routine -> null) -> routine -> routine
def apply : (S -> T) -> S -> result(T,string)
def fail : string -> A
// Advanced error handling
type rawLocs = <"co.rawLocs">
type goLocation =
[ #function: string,
#file: string,
#line: number
]
type mixLocation =
[ #locFile: string,
#locFrom: [number, number], // [line, col]
#locTo: [number, number], // [line, col]
#repeat: number
]
type rawErrorObject =
{ message: string,
errorType: errorType,
rawLocation: rawLocs, // only one location
rawBacktrace: rawLocs, // the call stack
goStack: array(goLocation)
}
type debugErrorObject =
{ message: string,
errorType: errorType,
rawLocation: rawLocs, // only one location
rawBacktrace: rawLocs, // the call stack
location: string, // either location is non-empty, or mixStack
mixStack: array(mixLocation),
goStack: array(goLocation)
}
def onErrorRaw : (errorType -> string -> rawErrorObject -> routine -> null) ->
routine -> routine // since PR1403
def onErrorDebug : (errorType -> string -> debugErrorObject -> routine -> null) ->
routine -> routine // since PR1403
def failAgain : string -> rawErrorObject -> A // since PR1403
def applyRaw : (S -> T) -> S -> result([string,rawErrorObject], T) // since PR1403
def applyDebug : (S -> T) -> S -> result([string,debugErrorObject], T) // since PR1403
// Time
case timeout
case terminated // since PR1595
def sleep : number -> null
def applyWithTimeout : number -> (S -> T) -> S -> (result(string, T) | timeout | terminated) // since PR 1160, modified by PR1595
// mutexes
type mutex = <"co.mutex">
def mutexCreate : null -> mutex
def mutexLock : mutex -> null
def mutexUnlock : mutex -> null
def critical : mutex -> (null -> T) -> T
// connect channels with live cells:
def consume : T* -> T
// publish/subscribe
type pubsub(T)
type subscription(T)
def pubsubCreate : T -> pubsub(T)
def pubsubConnect : pubsub(data) -> string -> null // since PR#761
def pubsubSubscribe : pubsub(T) -> subscription(T)
def pubsubUnsubscribe : subscription(T) -> null
def pubsubSend : pubsub(T) -> T -> null // since PR#761
// def pubsubStream : subscription(T) -> T* // until PR#761
def pubsubRecvStream : subscription(T) -> T* // since PR#761
def pubsubRecvLength : subscription(T) -> number // since PR#761
// def pubsubLength : subscription(T) -> number // until PR#761
// def pubsubPublish : pubsub(T) -> T -> null // until PR#761
def pubsubSet : pubsub(T) -> T -> null // since PR#761
// def pubsubLastValueStream : subscription(T) -> T* // until PR#761
def pubsubGotUpdate : subscription(T) -> bool // since PR#761
def pubsubGet : pubsub(T) -> T
def pubsubClose : pubsub(T) -> null
def pubsubOfSubscription : subscription(T) -> pubsub(T) // since PR#1956
// sliders
// TODO, see co.mix for the time being
module end
type routine = <"co.routine">
def create : (routine -> null) -> routine
def createFG : (routine -> null) -> routine
def exit : null -> null
def yield : null -> null
A coroutine is a concurrently running function. It is created with co.create(f) or co.createFG(f). Only one coroutine can run at a time (no parallel execution). Also, a coroutine must deliberately pass control to another coroutine when it wants to allow other computations. A simple and unconditional way to do so is to call co.yield() (for other ways, see below).
A coroutine exits when
f returns, orco.exit(), orfail(msg) to artificially trigger a runtime error, orWhen a coroutine exits, all owned channels die. See below for explanation when a channel is owned by a coroutine.
Coroutines are each marked with either “background” or “foreground” status. This has no bearing on how they execute, and does not change the fact that coroutines cannot run in parallel, but some track messages give you the option of waiting for all foreground routines to complete. The top-level coroutine initiated by an API call like msg_view_invoke is always foreground. You can explicitly create more foreground routines with co.createFG and viewstack.scheduleFG. co.apply and viewstack.schedule create coroutines that inherit their background-ness from their parent coroutine. Waiting for foreground routines to complete does not wait for foreground routines that are blocked on a channel read, just foreground routines waiting on async FFI calls. (The former case would include a foreground push to another view, since the pushing coroutine uses a channel to wait until the view is popped. It wouldn’t make sense to wait for this routine to finish since it is waiting on user action, not a computation/API call.)
type channel(T) = <"co.channel">(T)
def channelCreate : null -> channel(T)
def channelWait : channel(T) -> null
def channelEmit : channel(T) -> T -> null
def channelEmitEnd : channel(T) -> null
def channelToStream : channel(T) -> T*
def channelFromStream : T* -> channel(T)
def channelOwn : channel(T) -> null
A channel is a fifo buffer that allows a coroutine to wait for more data when the buffer is empty. A channel is created with co.channelCreate() and is initially empty. You can append more data to the buffer with co.channelEmit(ch,d) or add an end marker with co.channelEmitEnd(ch). Every coroutine is allowed to emit data to a channel.
Before PR#1539: Only one coroutine can wait for new data to appear in the buffer. This is done by either calling channelWait(ch), or by getting a stream from the channel with channelToStream(ch) and then reading from the stream. When a second coroutine tries to wait for new data while the first one is still waiting, the first coroutine terminates, and the second coroutine is allowed to wait (this behavior is called “stealing a channel”).
After PR#1539: Several coroutines can wait for new data to appear in the buffer. It is undefined which of the coroutines is woken up, however, when the data is eventually available.
There is a concept of ownership: A coroutine owns a channel while it is waiting for new data on the channel. Ownership connects the lifetime of the channel with the lifetime of the coroutine. This means that the channel dies when the coroutine exits. Also, ownership allows it to figure out when a channel is stolen.
Normally, the channel is only owned for the time of waiting for new data. Sometimes, it is advantageous to permanently connect a channel with a coroutine. In this case, channelOwn can be used to mark the channel as being permanently owned by the current coroutine.
When there is a pipeline of producers, transformers and consumers, the pipeline can be in some cases automatically cleaned up. The cleanup starts with the consumer. If the consumer coroutine exits, the channels it owns will die. These channels are fed with data by the next producer upstream. When this producer writes to the dead channel, the coroutine also automatically exits. By recursion, all upstream producers can terminate this way.
There is no automatic cleanup in the other direction. It is suggested to write end markers when a producer is about to exit, so the consumer can figure this out, and react on this.
def channelFrom : ((T -> null) -> null) -> channel(T)
def channelFromEH : ((T -> null) -> null) -> channel(result(rawErrorObject,T)) // since PR1403