Standard library - co

module co
  // creation of coroutines:
  type routine = <"co.routine">
  def create : (routine -> null) -> routine   // background routine
  def createFG : (routine -> null) -> routine // foreground routine
  def exit : null -> null
  def yield : null -> null

  // channels:
  type channel(T) = <"co.channel">(T)
  def channelCreate : null -> channel(T)
  def channelWait : channel(T) -> null
  def channelEmit : channel(T) -> T -> null
  def channelEmitEnd : channel(T) -> null
  def channelToStream : channel(T) -> T*
  def channelFromStream : T* -> channel(T)
  def channelOwn : channel(T) -> null

  // Control flow inversion:
  def channelFrom : ((T -> null) -> null) -> channel(T)
  def channelFromEH : ((T -> null) -> null) -> channel(result(rawErrorObject,T))   // since PR1403

  // Simple error handling
  def onError : (string -> routine -> null) -> routine -> routine
  def apply : (S -> T) -> S -> result(T,string)
  def fail : string -> A

  // Advanced error handling
  type rawLocs = <"co.rawLocs">
  type goLocation =
    [ #function: string,
      #file: string,
      #line: number
    ]
  type mixLocation =
    [ #locFile: string,
      #locFrom: [number, number],    // [line, col]
      #locTo: [number, number],      // [line, col]
      #repeat: number
    ]
  type rawErrorObject =
    { message: string,
      errorType: errorType,
      rawLocation: rawLocs,      // only one location
      rawBacktrace: rawLocs,     // the call stack
      goStack: array(goLocation)
    }
  type debugErrorObject =
    { message: string,
      errorType: errorType,
      rawLocation: rawLocs,     // only one location
      rawBacktrace: rawLocs,    // the call stack
      location: string,         // either location is non-empty, or mixStack
      mixStack: array(mixLocation),
      goStack: array(goLocation)
    }
  def onErrorRaw : (errorType -> string -> rawErrorObject -> routine -> null) ->
                   routine -> routine                                      // since PR1403
  def onErrorDebug : (errorType -> string -> debugErrorObject -> routine -> null) ->
                     routine -> routine                                    // since PR1403
  def failAgain : string -> rawErrorObject -> A                            // since PR1403
  def applyRaw : (S -> T) -> S -> result([string,rawErrorObject], T)       // since PR1403
  def applyDebug : (S -> T) -> S -> result([string,debugErrorObject], T)   // since PR1403

  // Time
  case timeout
  case terminated     // since PR1595
  def sleep : number -> null
  def applyWithTimeout : number -> (S -> T) -> S -> (result(string, T) | timeout | terminated) // since PR 1160, modified by PR1595

  // mutexes
  type mutex = <"co.mutex">
  def mutexCreate : null -> mutex
  def mutexLock : mutex -> null
  def mutexUnlock : mutex -> null
  def critical : mutex -> (null -> T) -> T

  // connect channels with live cells:
  def consume : T* -> T

  // publish/subscribe
  type pubsub(T)
  type subscription(T)
  def pubsubCreate : T -> pubsub(T)
  def pubsubConnect : pubsub(data) -> string -> null              // since PR#761
  def pubsubSubscribe : pubsub(T) -> subscription(T)
  def pubsubUnsubscribe : subscription(T) -> null
  def pubsubSend : pubsub(T) -> T -> null                         // since PR#761
  // def pubsubStream : subscription(T) -> T*                     // until PR#761
  def pubsubRecvStream : subscription(T) -> T*                    // since PR#761
  def pubsubRecvLength : subscription(T) -> number                // since PR#761
  // def pubsubLength : subscription(T) -> number                 // until PR#761
  // def pubsubPublish : pubsub(T) -> T -> null                   // until PR#761
  def pubsubSet : pubsub(T) -> T -> null                          // since PR#761
  // def pubsubLastValueStream : subscription(T) -> T*            // until PR#761
  def pubsubGotUpdate : subscription(T) -> bool                   // since PR#761
  def pubsubGet : pubsub(T) -> T
  def pubsubClose : pubsub(T) -> null
  def pubsubOfSubscription : subscription(T) -> pubsub(T)         // since PR#1956

  // sliders
  // TODO, see co.mix for the time being

module end

Creation of coroutines

  type routine = <"co.routine">
  def create : (routine -> null) -> routine
  def createFG : (routine -> null) -> routine
  def exit : null -> null
  def yield : null -> null

A coroutine is a concurrently running function. It is created with co.create(f) or co.createFG(f). Only one coroutine can run at a time (no parallel execution). Also, a coroutine must deliberately pass control to another coroutine when it wants to allow other computations. A simple and unconditional way to do so is to call co.yield() (for other ways, see below).

A coroutine exits when

When a coroutine exits, all owned channels die. See below for explanation when a channel is owned by a coroutine.

Coroutines are each marked with either “background” or “foreground” status. This has no bearing on how they execute, and does not change the fact that coroutines cannot run in parallel, but some track messages give you the option of waiting for all foreground routines to complete. The top-level coroutine initiated by an API call like msg_view_invoke is always foreground. You can explicitly create more foreground routines with co.createFG and viewstack.scheduleFG. co.apply and viewstack.schedule create coroutines that inherit their background-ness from their parent coroutine. Waiting for foreground routines to complete does not wait for foreground routines that are blocked on a channel read, just foreground routines waiting on async FFI calls. (The former case would include a foreground push to another view, since the pushing coroutine uses a channel to wait until the view is popped. It wouldn’t make sense to wait for this routine to finish since it is waiting on user action, not a computation/API call.)

Channels

  type channel(T) = <"co.channel">(T)
  def channelCreate : null -> channel(T)
  def channelWait : channel(T) -> null
  def channelEmit : channel(T) -> T -> null
  def channelEmitEnd : channel(T) -> null
  def channelToStream : channel(T) -> T*
  def channelFromStream : T* -> channel(T)
  def channelOwn : channel(T) -> null

A channel is a fifo buffer that allows a coroutine to wait for more data when the buffer is empty. A channel is created with co.channelCreate() and is initially empty. You can append more data to the buffer with co.channelEmit(ch,d) or add an end marker with co.channelEmitEnd(ch). Every coroutine is allowed to emit data to a channel.

Before PR#1539: Only one coroutine can wait for new data to appear in the buffer. This is done by either calling channelWait(ch), or by getting a stream from the channel with channelToStream(ch) and then reading from the stream. When a second coroutine tries to wait for new data while the first one is still waiting, the first coroutine terminates, and the second coroutine is allowed to wait (this behavior is called “stealing a channel”).

After PR#1539: Several coroutines can wait for new data to appear in the buffer. It is undefined which of the coroutines is woken up, however, when the data is eventually available.

There is a concept of ownership: A coroutine owns a channel while it is waiting for new data on the channel. Ownership connects the lifetime of the channel with the lifetime of the coroutine. This means that the channel dies when the coroutine exits. Also, ownership allows it to figure out when a channel is stolen.

Normally, the channel is only owned for the time of waiting for new data. Sometimes, it is advantageous to permanently connect a channel with a coroutine. In this case, channelOwn can be used to mark the channel as being permanently owned by the current coroutine.

Automatic cleanup of channels and coroutines

When there is a pipeline of producers, transformers and consumers, the pipeline can be in some cases automatically cleaned up. The cleanup starts with the consumer. If the consumer coroutine exits, the channels it owns will die. These channels are fed with data by the next producer upstream. When this producer writes to the dead channel, the coroutine also automatically exits. By recursion, all upstream producers can terminate this way.

There is no automatic cleanup in the other direction. It is suggested to write end markers when a producer is about to exit, so the consumer can figure this out, and react on this.

Control flow inversion

  def channelFrom : ((T -> null) -> null) -> channel(T)
  def channelFromEH : ((T -> null) -> null) -> channel(result(rawErrorObject,T))   // since PR1403