async

package module
v0.17.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 10, 2026 License: MIT Imports: 8 Imported by: 0

README

async

萬物皆有靈,有靈以為生。

A Go library for asynchronous programming.

This is quite an unusual implementation that coroutines do not exchange data directly. They just yield on awaiting events and resume on event notifications. Communications between coroutines are done by sending event notifications to each other. A coroutine can watch multiple events before yielding and an event notification can resume multiple coroutines.

Continue reading, or visit Go Reference if you are not there.

Documentation

Overview

Package async is a library for asynchronous programming.

Since Go has already done a great job in bringing green/virtual threads into life, this library only implements a single-threaded Executor type, which some refer to as an async runtime. One can create as many executors as they like.

While Go excels at forking, async, on the other hand, excels at joining.

Use Case #1: Fan-In Executing Code From Goroutines

Wanted to execute pieces of code from goroutines in a single-threaded way?

An Executor is designed to be able to run tasks spawned in goroutines sequentially. This comes in handy when one wants to do a series of operations on a single thread, for example, to read or update states that are not safe for concurrent access, to write data to the console or a file, to update one's user interfaces, etc.

Be aware that there is no back pressure. Task spawning isn't designed to block. If spawning outruns execution, an Executor can easily consume a lot of memory over time. To mitigate, one could introduce a semaphore per hot spot.

Use Case #2: Event-Driven Reactiveness

An async Task can be reactive.

An async Task is spawned with a Coroutine to take care of it. In this user-provided function, one can return a specific Result to tell a coroutine to watch and await some events (e.g. Signal, State, etc.), and the coroutine can just re-run the task whenever any of these events notifies.

This is useful when one wants to do something repeatedly. It works like a loop. To exit this loop, just return a Result that does something different from within the task function. Simple.

Use Case #3: Easy State Machines Across Goroutine Boundaries

A Coroutine can also make a transition from one Task to another, just like a state machine can make a transition from one state to another. This is done by returning another specific Result from within a task function. A coroutine can transition from one task to another until a task ends it.

With the ability to transition, async is able to provide more advanced control structures, like Block, Loop and Func, to ease the process of writing async code. The experience now feels similar to that of writing sync code.

Spawning Async Tasks vs. Passing Data Over Go Channels

It's not recommended to have channel operations in an async Task for a Coroutine to do, since they tend to block. For an Executor, if one coroutine blocks, no other coroutines can run. So instead of passing data around, one would just handle data at places where data are available. Async tasks are quite flexible and composable. One can even build a state machine across goroutine boundaries, which isn't something channels are qualified to do. Async tasks are better building blocks than channels.

One of the advantages of passing data over channels is to be able to avoid allocation. Unfortunately, async tasks always escape to heap. Any variable they capture also escapes to heap. One should stay alert and take measures in hot spots, like repeatedly using a same task.

The Essentiality of Structured Concurrency

Async encourages non-blocking programming, which makes structured concurrency quite essential to this library. At some point, one might want to know when an Executor stops operating.

In Go, all Go code can only be executed by goroutines. Async tasks are just ordinary Go functions, too. If one keeps track of every goroutine that spawns async tasks, and waits for them to finish, then the exact right time when an Executor stops operating can be determined too. Essentially, it creates a synchronization point between the spawned tasks and the rest of the code after the waiting.

Root/Child Coroutines

Coroutines spawned by an Executor are root coroutines. Coroutines spawned by the Coroutine.Spawn method, in a Task function, are child coroutines.

Child coroutines are Task-scoped and, therefore, cancelable. When a Task completes, all child coroutines spawned in it are canceled.

Conversely, root coroutines are not cancelable. One must cooperatively tell a root coroutine to exit. Though, it's possible to just let them rot in the background. The Go runtime would happily garbage collect them when there are no references to them.

By default, canceled child coroutines cannot yield. All yield points are treated like exit points. However, within a NonCancelable context, a canceled child coroutine is allowed to yield, which would correspondingly cause its parent coroutine to yield, too. In such case, the parent coroutine stays suspended until all its child coroutines complete.

Panic Propagation

Child coroutines propagate unrecovered panics to their parent coroutines. Root coroutines propagate unrecovered panics to their Executor, causing the Executor.Run method to panic when it returns.

If a coroutine spawns multiple child coroutines and one of them panics without recovering, the coroutine cancels other child coroutines. Then, after all child coroutines complete, the coroutine propagates panics to its parent coroutine, or its Executor if it's a root coroutine.

Example
package main

import (
	"fmt"

	"github.com/b97tsk/async"
)

func main() {
	// Create an executor.
	var myExecutor async.Executor

	// Set up an autorun function to run an executor automatically whenever a coroutine is spawned or resumed.
	// The best practice is to pass a function that does not block. See Example (NonBlocking).
	myExecutor.Autorun(myExecutor.Run)

	// Create some states.
	s1, s2 := async.NewState(1), async.NewState(2)
	op := async.NewState('+')

	// Although states can be created without the help of executors,
	// they might only be safe for use by one and only one executor due to the concern of data races.
	// Without proper synchronization, it's better only to spawn coroutines to read or update states.

	// Create a coroutine to print the sum or the product of s1 and s2, depending on what op is.
	myExecutor.Spawn(func(co *async.Coroutine) async.Result {
		co.Watch(op) // Let co depend on op, so co can re-run whenever op changes.

		fmt.Println("op =", "'"+string(op.Get())+"'")

		switch op.Get() {
		case '+':
			// Using a child coroutine to narrow down what has to react whenever a state changes might be a good idea.
			// The following creates a child coroutine, it runs immediately and re-runs whenever s1 or s2 changes.
			co.Spawn(func(co *async.Coroutine) async.Result {
				fmt.Println("s1 + s2 =", s1.Get()+s2.Get())
				return co.Yield(s1, s2) // Yields and awaits s1 and s2.
			})
		case '*':
			co.Spawn(func(co *async.Coroutine) async.Result {
				fmt.Println("s1 * s2 =", s1.Get()*s2.Get())
				return co.Yield(s1, s2)
			})
		}

		return co.Yield() // Yields and awaits anything that has been watched (in this case, op).
	})

	fmt.Println("--- SEPARATOR ---")

	myExecutor.Spawn(async.Do(func() {
		s1.Set(3)
		s2.Set(4)
	}))

	fmt.Println("--- SEPARATOR ---")

	myExecutor.Spawn(async.Do(func() {
		op.Set('*')
	}))

	fmt.Println("--- SEPARATOR ---")

	myExecutor.Spawn(async.Do(func() {
		s1.Set(5)
		s2.Set(6)
	}))

	fmt.Println("--- SEPARATOR ---")

	myExecutor.Spawn(async.Do(func() {
		s1.Set(7)
		s2.Set(8)
		op.Set('+')
	}))

}
Output:

op = '+'
s1 + s2 = 3
--- SEPARATOR ---
s1 + s2 = 7
--- SEPARATOR ---
op = '*'
s1 * s2 = 12
--- SEPARATOR ---
s1 * s2 = 30
--- SEPARATOR ---
op = '+'
s1 + s2 = 15
Example (Conditional)

This example demonstrates how a task can conditionally depend on a state.

package main

import (
	"fmt"

	"github.com/b97tsk/async"
)

func main() {
	var myExecutor async.Executor

	myExecutor.Autorun(myExecutor.Run)

	s1, s2, s3 := async.NewState(1), async.NewState(2), async.NewState(7)

	myExecutor.Spawn(func(co *async.Coroutine) async.Result {
		co.Watch(s1, s2) // Always depends on s1 and s2.

		v := s1.Get() + s2.Get()
		if v%2 == 0 {
			co.Watch(s3) // Conditionally depends on s3.
			v *= s3.Get()
		}

		fmt.Println(v)
		return co.Yield()
	})

	inc := func(i int) int { return i + 1 }

	myExecutor.Spawn(async.Do(func() { s3.Notify() })) // Nothing happens.
	myExecutor.Spawn(async.Do(func() { s1.Update(inc) }))
	myExecutor.Spawn(async.Do(func() { s3.Notify() }))
	myExecutor.Spawn(async.Do(func() { s2.Update(inc) }))
	myExecutor.Spawn(async.Do(func() { s3.Notify() })) // Nothing happens.

}
Output:

3
28
28
5
Example (End)

This example demonstrates how to end a task. It creates a task that prints the value of a state whenever it changes. The task only prints 0, 1, 2 and 3 because it is ended after 3.

package main

import (
	"fmt"

	"github.com/b97tsk/async"
)

func main() {
	var myExecutor async.Executor

	myExecutor.Autorun(myExecutor.Run)

	var myState async.State[int]

	myExecutor.Spawn(func(co *async.Coroutine) async.Result {
		co.Watch(&myState)

		v := myState.Get()
		fmt.Println(v)

		if v < 3 {
			return co.Yield()
		}

		return co.End()
	})

	for i := 1; i <= 5; i++ {
		myExecutor.Spawn(async.Do(func() { myState.Set(i) }))
	}

	fmt.Println(myState.Get()) // Prints 5.

}
Output:

0
1
2
3
5
Example (NonBlocking)

This example demonstrates how to set up an autorun function to run an executor in a goroutine automatically whenever a coroutine is spawned or resumed.

package main

import (
	"fmt"
	"sync"

	"github.com/b97tsk/async"
)

func main() {
	var wg sync.WaitGroup // For keeping track of goroutines.

	var myExecutor async.Executor

	myExecutor.Autorun(func() { wg.Go(myExecutor.Run) })

	s1, s2 := async.NewState(1), async.NewState(2)
	op := async.NewState('+')

	myExecutor.Spawn(func(co *async.Coroutine) async.Result {
		co.Watch(op)

		fmt.Println("op =", "'"+string(op.Get())+"'")

		switch op.Get() {
		case '+':
			co.Spawn(func(co *async.Coroutine) async.Result {
				fmt.Println("s1 + s2 =", s1.Get()+s2.Get())
				return co.Yield(s1, s2)
			})
		case '*':
			co.Spawn(func(co *async.Coroutine) async.Result {
				fmt.Println("s1 * s2 =", s1.Get()*s2.Get())
				return co.Yield(s1, s2)
			})
		}

		return co.Yield()
	})

	wg.Wait() // Wait for autorun to complete.
	fmt.Println("--- SEPARATOR ---")

	myExecutor.Spawn(async.Do(func() {
		s1.Set(3)
		s2.Set(4)
	}))

	wg.Wait()
	fmt.Println("--- SEPARATOR ---")

	myExecutor.Spawn(async.Do(func() {
		op.Set('*')
	}))

	wg.Wait()
	fmt.Println("--- SEPARATOR ---")

	myExecutor.Spawn(async.Do(func() {
		s1.Set(5)
		s2.Set(6)
	}))

	wg.Wait()
	fmt.Println("--- SEPARATOR ---")

	myExecutor.Spawn(async.Do(func() {
		s1.Set(7)
		s2.Set(8)
		op.Set('+')
	}))

	wg.Wait()

}
Output:

op = '+'
s1 + s2 = 3
--- SEPARATOR ---
s1 + s2 = 7
--- SEPARATOR ---
op = '*'
s1 * s2 = 12
--- SEPARATOR ---
s1 * s2 = 30
--- SEPARATOR ---
op = '+'
s1 + s2 = 15
Example (PanicAndRecover)

This example demonstrates how async handles panics.

package main

import (
	"errors"
	"fmt"
	"strings"
	"sync"
	"time"

	"github.com/b97tsk/async"
)

func main() {
	var wg sync.WaitGroup // For keeping track of goroutines.

	var myExecutor async.Executor

	dummyError := errors.New("dummy")

	myExecutor.Autorun(func() {
		wg.Go(func() {
			defer func() {
				if v := recover(); v != nil {
					err, ok := v.(error)
					if ok && errors.Is(err, dummyError) && strings.Contains(err.Error(), "dummy") {
						fmt.Println("dummy error recovered!")
						return
					}
					panic(v) // Repanic unexpected values.
				}
			}()
			myExecutor.Run()
		})
	})

	sleep := func(d time.Duration) async.Task {
		return func(co *async.Coroutine) async.Result {
			var sig async.Signal
			wg.Add(1) // Keep track of timers too.
			tm := time.AfterFunc(d, func() {
				defer wg.Done()
				myExecutor.Spawn(async.Do(sig.Notify))
			})
			co.CleanupFunc(func() {
				if tm.Stop() {
					wg.Done()
				}
			})
			return co.Await(&sig).End()
		}
	}

	recover := func(co *async.Coroutine) async.Result {
		fmt.Println(co.Recover())
		return co.End()
	}

	myExecutor.Spawn(func(co *async.Coroutine) async.Result {
		co.Defer(recover)
		panic("A")
	})

	wg.Wait()
	fmt.Println("--- SEPARATOR ---")

	myExecutor.Spawn(func(co *async.Coroutine) async.Result {
		// Cleanups are Task-scoped, while defers are Func-scoped.
		co.CleanupFunc(func() { panic("A") }) // Goes out of scope first.
		co.Defer(recover)
		return co.End()
	})

	wg.Wait()
	fmt.Println("--- SEPARATOR ---")

	myExecutor.Spawn(async.Join(
		async.Block(
			async.Defer(recover),
			func(co *async.Coroutine) async.Result {
				co.Spawn(func(_ *async.Coroutine) async.Result {
					panic("A") // Child coroutines propagate panics.
				})
				panic("B") // Didn't run.
			},
		),
		async.Block(
			async.Defer(recover),
			func(co *async.Coroutine) async.Result {
				co.Spawn(async.Block(
					sleep(100*time.Millisecond),
					async.Do(func() { panic("A") }), // Panics after 100ms.
				))
				co.Spawn(async.Block(
					async.Defer(async.Do(func() { fmt.Println("canceled") })),
					async.Await(), // This child coroutine never ends, but it can be canceled.
				))
				return co.Await().End()
			},
		),
	))

	wg.Wait()
	fmt.Println("--- SEPARATOR ---")

	myExecutor.Spawn(async.Join(
		async.Block(
			async.Defer(recover), // Recovers the whole panic stack (but only given the latest one).
			async.Defer(func(_ *async.Coroutine) async.Result {
				panic("B") // Panics stack up.
			}),
			async.Do(func() { panic("A") }),
		),
		async.Block(
			async.Defer(recover), // Recovers "C", while "A" is discarded.
			async.Defer(async.Block(
				// async.Func introduces a new scope for panic recovering.
				async.Func(func(co *async.Coroutine) async.Result {
					co.Defer(recover) // Recovers "B", while "A" remains in the panic stack.
					panic("B")
				}),
				async.Do(func() { panic("C") }), // Stacks up onto "A".
			)),
			async.Do(func() { panic("A") }),
		),
	))

	wg.Wait()
	fmt.Println("--- SEPARATOR ---")

	myExecutor.Spawn(async.Block(
		async.Defer(recover),
		func(co *async.Coroutine) async.Result {
			return co.Await().Until(func() bool { panic("A") }).End()
		},
	))

	wg.Wait()
	fmt.Println("--- SEPARATOR ---")

	myExecutor.Spawn(async.Join(
		async.Block(
			async.Defer(recover),
			async.FromSeq(func(yield func(async.Task) bool) {
				panic("A")
			}),
		),
		async.Block(
			async.Defer(recover),
			async.FromSeq(func(yield func(async.Task) bool) {
				yield(async.Return())
				panic("A")
			}),
		),
	))

	wg.Wait()
	fmt.Println("--- SEPARATOR ---")

	myExecutor.Spawn(async.Join(
		async.Block(
			async.Defer(recover),
			async.Break(), // Break without a loop.
		),
		async.Block(
			async.Defer(recover),
			async.Continue(), // Continue without a loop.
		),
	))

	wg.Wait()
	fmt.Println("--- SEPARATOR ---")

	recoverB := func(co *async.Coroutine) async.Result {
		fmt.Println(co.RecoverFunc(func(v any) bool { return v == "B" }))
		return co.End()
	}

	myExecutor.Spawn(async.Join(
		async.Block(
			async.Defer(recover),
			async.Defer(recoverB),
			async.Panic("A"), // async.Panic is like built-in panic but leaves no stack trace behind.
		),
		async.Block(
			async.Defer(recover),
			async.Defer(recoverB),
			async.Panic("B"),
		),
	))

	wg.Wait()
	fmt.Println("--- SEPARATOR ---")

	myExecutor.Spawn(func(_ *async.Coroutine) async.Result {
		panic(dummyError) // Unrecovered panics get repanicked when (*async.Executor).Run returns.
	})

	wg.Wait()

}
Output:

A
--- SEPARATOR ---
A
--- SEPARATOR ---
A
canceled
A
--- SEPARATOR ---
B
B
C
--- SEPARATOR ---
A
--- SEPARATOR ---
A
A
--- SEPARATOR ---
async: unhandled break action
async: unhandled continue action
--- SEPARATOR ---
<nil>
A
B
<nil>
--- SEPARATOR ---
dummy error recovered!
Example (Transition)

This example demonstrates how a coroutine can transition from one task to another.

package main

import (
	"fmt"

	"github.com/b97tsk/async"
)

func main() {
	var myExecutor async.Executor

	myExecutor.Autorun(myExecutor.Run)

	var myState async.State[int]

	myExecutor.Spawn(func(co *async.Coroutine) async.Result {
		co.Watch(&myState)

		v := myState.Get()
		fmt.Println(v)

		if v < 3 {
			return co.Yield()
		}

		return co.Transition(func(co *async.Coroutine) async.Result {
			co.Watch(&myState)

			v := myState.Get()
			fmt.Println(v, "(transitioned)")

			if v < 5 {
				return co.Yield()
			}

			return co.End()
		})
	})

	for i := 1; i <= 7; i++ {
		myExecutor.Spawn(async.Do(func() { myState.Set(i) }))
	}

	fmt.Println(myState.Get()) // Prints 7.

}
Output:

0
1
2
3
3 (transitioned)
4 (transitioned)
5 (transitioned)
7

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Cleanup added in v0.5.0

type Cleanup interface {
	Cleanup()
}

Cleanup represents any type that carries a Cleanup method. A Cleanup can be added to a coroutine in a Task function for making an effect some time later when the coroutine resumes or finishes a Task.

type CleanupFunc added in v0.5.0

type CleanupFunc func()

A CleanupFunc is a func() that implements the Cleanup interface.

func (CleanupFunc) Cleanup added in v0.5.0

func (f CleanupFunc) Cleanup()

Cleanup implements the Cleanup interface.

type Coroutine added in v0.2.0

type Coroutine struct {
	// contains filtered or unexported fields
}

A Coroutine is an execution of code, similar to a goroutine but cooperative and stackless.

A coroutine is created with a function called Task. A coroutine's job is to complete the task. When an Executor spawns a coroutine with a task, it runs the coroutine by calling the task function with the coroutine as the argument. The return value determines whether to end the coroutine or to yield it so that it could resume later.

In order for a coroutine to resume, the coroutine must watch at least one Event (e.g. Signal, State, etc.), when calling the task function. A notification of such an event resumes the coroutine. When a coroutine is resumed, the executor runs the coroutine again.

A coroutine can also make a transition to work on another task according to the return value of the task function. A coroutine can transition from one task to another until a task ends it.

func (*Coroutine) Await added in v0.2.0

func (co *Coroutine) Await(ev ...Event) PendingResult

Await returns a PendingResult that can be transformed into a Result with one of its methods, which will then cause co to yield. Await also accepts additional events to watch.

func (*Coroutine) Break added in v0.3.0

func (co *Coroutine) Break() Result

Break returns a Result that will cause co to break a Loop (or LoopN).

func (*Coroutine) Canceled added in v0.15.0

func (co *Coroutine) Canceled() bool

Canceled reports whether co has been canceled.

func (*Coroutine) Cleanup added in v0.3.0

func (co *Coroutine) Cleanup(c Cleanup)

Cleanup adds something to clean up when co resumes or finishes a Task.

func (*Coroutine) CleanupFunc added in v0.5.0

func (co *Coroutine) CleanupFunc(f func())

CleanupFunc adds a function call when co resumes or finishes a Task.

func (*Coroutine) Continue added in v0.3.0

func (co *Coroutine) Continue() Result

Continue returns a Result that will cause co to continue a Loop (or LoopN).

func (*Coroutine) Defer added in v0.2.0

func (co *Coroutine) Defer(t Task)

Defer adds a Task for execution when returning from a Func. Deferred tasks are executed in last-in-first-out (LIFO) order.

func (*Coroutine) End added in v0.2.0

func (co *Coroutine) End() Result

End returns a Result that will cause co to end its current running task.

func (*Coroutine) Executor added in v0.2.0

func (co *Coroutine) Executor() *Executor

Executor returns the executor that spawned co.

func (*Coroutine) Exit added in v0.3.0

func (co *Coroutine) Exit() Result

Exit returns a Result that will cause co to exit. All deferred tasks will be run before co exits.

func (*Coroutine) Exiting added in v0.4.0

func (co *Coroutine) Exiting() bool

Exiting reports whether co is exiting.

When exiting, entering a Func, in a deferred task, would temporarily reset Exiting to false until that Func ends or exits again.

func (*Coroutine) NonCancelable added in v0.16.0

func (co *Coroutine) NonCancelable() bool

NonCancelable reports whether co is currently running a NonCancelable task.

func (*Coroutine) Panic added in v0.17.0

func (co *Coroutine) Panic(v any) Result

Panic returns a Result that will cause co to behave like there's a panic. Unlike the built-in panic function, Panic leaves no stack trace behind. Please use with caution.

func (*Coroutine) Panicking added in v0.14.0

func (co *Coroutine) Panicking() bool

Panicking reports whether co is panicking.

When panicking, entering a Func, in a deferred task, would temporarily reset Panicking to false until that Func ends or panics again.

func (*Coroutine) Parent added in v0.11.0

func (co *Coroutine) Parent() *Coroutine

Parent returns the parent coroutine of co.

Note that a coroutine must not escape to a non-child coroutine or another goroutine because, a coroutine may be put into pool for later reuse when it completes.

func (*Coroutine) Recover added in v0.14.0

func (co *Coroutine) Recover() (v any)

Recover returns the latest value in the panic stack and stops co from panicking. If co isn't panicking, Recover returns nil.

One might be tempted to use the built-in panic function and this method to mimic the power of try-catch statement in some other programming languages, but there's a cost. In order to be able to continue running, when there's a panic, a coroutine immediately recovers it and puts it into the panic stack, along with a stack trace returned by runtime/debug.Stack, which might take thousands of bytes.

Instead of using the built-in panic function to trigger a panic, one could consider use Coroutine.Panic to mimic one, which leaves no stack trace behind.

func (*Coroutine) RecoverFunc added in v0.17.0

func (co *Coroutine) RecoverFunc(f func(v any) bool) (v any)

RecoverFunc is like Coroutine.Recover but only recovers the recent panic that satisfies a condition.

func (*Coroutine) Resume added in v0.11.0

func (co *Coroutine) Resume()

Resume resumes co.

func (*Coroutine) Resumed added in v0.10.0

func (co *Coroutine) Resumed() bool

Resumed reports whether co has been resumed.

func (*Coroutine) Return added in v0.3.0

func (co *Coroutine) Return() Result

Return returns a Result that will cause co to return from a Func.

func (*Coroutine) Spawn added in v0.2.0

func (co *Coroutine) Spawn(t Task)

Spawn creates a child coroutine with the same weight as co to work on t.

Spawn runs t immediately. If t panics immediately, Spawn panics, too.

Child coroutines, if not yet ended, are canceled when the parent one resumes or finishes a Task. When a coroutine is canceled, it runs to completion with all yield points treated like exit points.

However, within a NonCancelable context, a canceled coroutine is allowed to yield, which correspondingly causes its parent coroutine to yield, too. In such case, the parent coroutine stays suspended until all its child coroutines complete.

func (*Coroutine) Transition added in v0.9.0

func (co *Coroutine) Transition(t Task) Result

Transition returns a Result that will cause co to make a transition to work on t.

func (*Coroutine) Watch added in v0.2.0

func (co *Coroutine) Watch(ev ...Event)

Watch watches some events so that, when any of them notifies, co resumes.

func (*Coroutine) Weight added in v0.8.0

func (co *Coroutine) Weight() Weight

Weight returns the weight of co.

func (*Coroutine) Yield added in v0.2.0

func (co *Coroutine) Yield(ev ...Event) Result

Yield returns a Result that will cause co to yield and, when co is resumed, reiterate the running task. Yield also accepts additional events to watch.

type Event

type Event interface {
	// contains filtered or unexported methods
}

Event is the interface of any type that can be watched by a coroutine.

The following types implement Event: Signal and State. Any type that embeds Signal also implements Event, e.g. State.

type Executor

type Executor struct {
	// contains filtered or unexported fields
}

An Executor is a coroutine spawner, and a coroutine runner.

When a coroutine is spawned or resumed, it is added into an internal queue. The Run method then pops and runs each of them from the queue until the queue is emptied. It is done in a single-threaded manner. If one coroutine blocks, no other coroutines can run. The best practice is not to block.

The internal queue is a priority queue. Coroutines added in the queue are sorted by their weights. Coroutines with the same weight are sorted by their levels (child coroutines have one level higher than their parent ones). Coroutines with the same weight and level are sorted by their arrival order (FIFO). Popping the queue removes the first coroutine with the highest weight or the least level.

Manually calling the Run method is usually not desired. One would instead use the Autorun method to set up an autorun function to calling the Run method automatically whenever a coroutine is spawned or resumed. An Executor never calls the autorun function twice at the same time.

func (*Executor) Autorun

func (e *Executor) Autorun(f func())

Autorun sets up an autorun function to calling the Run method automatically whenever a coroutine is spawned or resumed.

One must pass a function that calls the Run method.

If f blocks, the Spawn method may block too. The best practice is not to block.

func (*Executor) Run

func (e *Executor) Run()

Run pops and runs every coroutine in the queue until the queue is emptied.

Run must not be called twice at the same time.

func (*Executor) Spawn

func (e *Executor) Spawn(t Task)

Spawn creates a coroutine with default weight to work on t.

The coroutine is added in a queue. To run it, either call the Run method, or call the Autorun method to set up an autorun function beforehand.

Spawn is safe for concurrent use.

func (*Executor) SpawnWeighted added in v0.7.0

func (e *Executor) SpawnWeighted(w Weight, t Task)

SpawnWeighted creates a coroutine with weight w to work on t.

The coroutine is added in a queue. To run it, either call the Run method, or call the Autorun method to set up an autorun function beforehand.

SpawnWeighted is safe for concurrent use.

type PendingResult added in v0.12.0

type PendingResult struct {
	// contains filtered or unexported fields
}

PendingResult is the return type of the Coroutine.Await method. A PendingResult is an intermediate value that must be transformed into a Result with one of its methods before returning from a Task.

func (PendingResult) Break added in v0.12.0

func (pr PendingResult) Break() Result

Break returns a Result that will cause the running coroutine to yield and, when resumed, break a Loop (or LoopN).

func (PendingResult) Continue added in v0.12.0

func (pr PendingResult) Continue() Result

Continue returns a Result that will cause the running coroutine to yield and, when resumed, continue a Loop (or LoopN).

func (PendingResult) End added in v0.12.0

func (pr PendingResult) End() Result

End returns a Result that will cause the running coroutine to yield and, when resumed, end the running task.

func (PendingResult) Exit added in v0.13.0

func (pr PendingResult) Exit() Result

Exit returns a Result that will cause the running coroutine to yield and, when resumed, cause the running coroutine to exit.

func (PendingResult) Panic added in v0.17.0

func (pr PendingResult) Panic(v any) Result

Panic returns a Result that will cause the running coroutine to yield and, when resumed, cause the running coroutine to behave like there's a panic. Unlike the built-in panic function, Panic leaves no stack trace behind. Please use with caution.

func (PendingResult) Reiterate added in v0.12.0

func (pr PendingResult) Reiterate() Result

Reiterate returns a Result that will cause the running coroutine to yield and, when resumed, reiterate the running task.

func (PendingResult) Return added in v0.12.0

func (pr PendingResult) Return() Result

Return returns a Result that will cause the running coroutine to yield and, when resumed, return from a Func.

func (PendingResult) Then added in v0.12.0

func (pr PendingResult) Then(t Task) Result

Then returns a Result that will cause the running coroutine to yield and, when resumed, make a transition to work on another Task.

func (PendingResult) Until added in v0.12.0

func (pr PendingResult) Until(f func() bool) PendingResult

Until transforms pr into one with a condition. Affected coroutines remain suspended until the condition is met.

type Result

type Result struct {
	// contains filtered or unexported fields
}

Result is the type of the return value of a Task function. A Result determines what next for a coroutine to do after running a task.

A Result can be created by calling one of the following methods:

These methods may have side effects. One should never store a Result in a variable and overwrite it with another, before returning it. Instead, one should just return a Result right after it is created.

type Semaphore added in v0.2.0

type Semaphore struct {
	// contains filtered or unexported fields
}

Semaphore provides a way to bound asynchronous access to a resource. The callers can request access with a given weight.

Note that this Semaphore type does not provide backpressure for spawning a lot of tasks. One should instead look for a sync implementation.

A Semaphore must not be shared by more than one Executor.

Example
package main

import (
	"fmt"
	"sync"
	"time"

	"github.com/b97tsk/async"
)

func main() {
	var wg sync.WaitGroup // For keeping track of goroutines.

	var myExecutor async.Executor

	myExecutor.Autorun(func() { wg.Go(myExecutor.Run) })

	mySemaphore := async.NewSemaphore(12)

	for n := async.Weight(1); n <= 8; n++ {
		myExecutor.Spawn(mySemaphore.Acquire(n).Then(async.Do(func() {
			fmt.Println(n)
			wg.Go(func() {
				time.Sleep(100 * time.Millisecond)
				myExecutor.Spawn(async.Do(func() { mySemaphore.Release(n) }))
			})
		})))
	}

	wg.Wait()

}
Output:

1
2
3
4
5
6
7
8
Example (Cancel)
package main

import (
	"fmt"
	"sync"
	"time"

	"github.com/b97tsk/async"
)

func main() {
	var wg sync.WaitGroup // For keeping track of goroutines.

	var myExecutor async.Executor

	myExecutor.Autorun(func() { wg.Go(myExecutor.Run) })

	mySemaphore := async.NewSemaphore(3)

	myExecutor.Spawn(func(co *async.Coroutine) async.Result {
		// Four Acquire calls, only two of them can succeed;
		// the other two get canceled later when co ends.
		for n := async.Weight(1); n <= 4; n++ {
			co.Spawn(mySemaphore.Acquire(n).Then(async.Do(func() {
				fmt.Println(n)
			})))
		}

		var sig async.Signal

		wg.Go(func() {
			time.Sleep(100 * time.Millisecond)
			myExecutor.Spawn(async.Do(sig.Notify))
		})

		return co.Await(&sig).End()
	})

	wg.Wait()

}
Output:

1
2

func NewSemaphore added in v0.2.0

func NewSemaphore(n Weight) *Semaphore

NewSemaphore creates a new weighted semaphore with the given maximum combined weight.

func (*Semaphore) Acquire added in v0.2.0

func (s *Semaphore) Acquire(n Weight) Task

Acquire returns a Task that awaits until a weight of n is acquired from the semaphore, and then ends.

func (*Semaphore) Release added in v0.2.0

func (s *Semaphore) Release(n Weight)

Release releases the semaphore with a weight of n.

One should only call this method in a Task function.

func (*Semaphore) TryAcquire added in v0.16.0

func (s *Semaphore) TryAcquire(n Weight) bool

TryAcquire acquires the semaphore with a weight of n without blocking. On success, returns true. On failure, returns false and leaves the semaphore unchanged.

Without proper synchronization, one should only call this method in a Task function.

type Signal

type Signal struct {
	// contains filtered or unexported fields
}

Signal is a type that implements Event.

Calling the Notify method of a signal, in a Task function, resumes any coroutine that is watching the signal.

A Signal must not be shared by more than one Executor.

func (*Signal) Notify

func (s *Signal) Notify()

Notify resumes any coroutine that is watching s.

One should only call this method in a Task function.

type State

type State[T any] struct {
	Signal
	// contains filtered or unexported fields
}

A State is a Signal that carries a value. To retrieve the value, call the Get method.

Calling the Set method of a state, in a Task function, updates the value and resumes any coroutine that is watching the state.

A State must not be shared by more than one Executor.

func NewState

func NewState[T any](v T) *State[T]

NewState creates a new State with its initial value set to v.

func (*State[T]) Await added in v0.12.0

func (s *State[T]) Await(f func(v T) bool) Task

Await returns a Task that awaits until the value of s meets a specified condition, and then ends.

Example

This example demonstrates how to await a state until a condition is met.

package main

import (
	"fmt"

	"github.com/b97tsk/async"
)

func main() {
	var myExecutor async.Executor

	myExecutor.Autorun(myExecutor.Run)

	var myState async.State[int]

	myExecutor.Spawn(myState.Await(
		func(v int) bool { return v >= 3 },
	).Then(async.Do(func() {
		fmt.Println(myState.Get()) // Prints 3.
	})))

	for i := 1; i <= 5; i++ {
		myExecutor.Spawn(async.Do(func() { myState.Set(i) }))
	}

	fmt.Println(myState.Get()) // Prints 5.

}
Output:

3
5

func (*State[T]) Get

func (s *State[T]) Get() T

Get retrieves the value of s.

Without proper synchronization, one should only call this method in a Task function.

func (*State[T]) Set

func (s *State[T]) Set(v T)

Set updates the value of s and resumes any coroutine that is watching s.

One should only call this method in a Task function.

func (*State[T]) Update added in v0.3.0

func (s *State[T]) Update(f func(v T) T)

Update sets the value of s to f(s.Get()) and resumes any coroutine that is watching s.

One should only call this method in a Task function.

type Task

type Task func(co *Coroutine) Result

A Task is a piece of work that a coroutine is given to do when it is spawned. The return value of a task, a Result, determines what next for a coroutine to do.

The argument co must not escape to a non-child coroutine or another goroutine because, co may be put into pool for later reuse when co completes.

func Await added in v0.3.0

func Await(ev ...Event) Task

Await returns a Task that awaits some events until any of them notifies, and then ends. If ev is empty, Await returns a Task that never ends.

func Block added in v0.3.0

func Block(s ...Task) Task

Block returns a Task that runs each of the given tasks in sequence. When one task ends, Block runs another.

Example

This example demonstrates how to run a block of tasks. A block can have zero or more tasks. A block runs tasks in sequence.

package main

import (
	"fmt"

	"github.com/b97tsk/async"
)

func main() {
	var myExecutor async.Executor

	myExecutor.Autorun(myExecutor.Run)

	var myState async.State[int]

	myExecutor.Spawn(func(co *async.Coroutine) async.Result {
		var t async.Task

		t = async.Block(
			async.Await(&myState),
			async.Do(func() {
				if v := myState.Get(); v%2 != 0 {
					fmt.Println(v)
				}
			}),
			func(co *async.Coroutine) async.Result {
				if v := myState.Get(); v >= 7 {
					return co.End()
				}
				return co.Transition(t) // Transition to t again to form a loop.
			},
		)

		return co.Transition(t)
	})

	for i := 1; i <= 9; i++ {
		myExecutor.Spawn(async.Do(func() { myState.Set(i) }))
	}

	fmt.Println(myState.Get()) // Prints 9.

}
Output:

1
3
5
7
9

func Break added in v0.3.0

func Break() Task

Break returns a Task that breaks a Loop (or LoopN).

func Continue added in v0.3.0

func Continue() Task

Continue returns a Task that continues a Loop (or LoopN).

func Defer added in v0.3.0

func Defer(t Task) Task

Defer returns a Task that adds t for execution when returning from a Func. Deferred tasks are executed in last-in-first-out (LIFO) order.

func Do

func Do(f func()) Task

Do returns a Task that calls f, and then ends.

func End added in v0.3.0

func End() Task

End returns a Task that ends without doing anything.

func Exit added in v0.3.0

func Exit() Task

Exit returns a Task that causes the coroutine that runs it to exit. All deferred tasks are run before the coroutine exits.

func FromSeq added in v0.4.0

func FromSeq(seq iter.Seq[Task]) Task

FromSeq returns a Task that runs each of the tasks from seq in sequence.

Caveat: requires spawning a goroutine (which is stackful) when running the returned task. The goroutine leaks, as well as the coroutine that runs the returned task, if the returned task never ends.

Example
package main

import (
	"fmt"

	"github.com/b97tsk/async"
)

func main() {
	var myExecutor async.Executor

	myExecutor.Autorun(myExecutor.Run)

	var myState async.State[int]

	myExecutor.Spawn(async.FromSeq(
		func(yield func(async.Task) bool) {
			await := async.Await(&myState)
			for yield(await) {
				v := myState.Get()
				if v%2 != 0 {
					fmt.Println(v)
				}
				if v >= 7 {
					return
				}
			}
		},
	))

	for i := 1; i <= 9; i++ {
		myExecutor.Spawn(async.Do(func() { myState.Set(i) }))
	}

	fmt.Println(myState.Get()) // Prints 9.

}
Output:

1
3
5
7
9

func Func added in v0.3.0

func Func(t Task) Task

Func returns a Task that runs t in a function scope. Spawned tasks are considered surrounded by an invisible Func.

Example
package main

import (
	"fmt"

	"github.com/b97tsk/async"
)

func main() {
	var myExecutor async.Executor

	myExecutor.Autorun(myExecutor.Run)

	var myState async.State[int]

	myExecutor.Spawn(async.Block(
		async.Defer( // Note that spawned tasks are considered surrounded by an invisible async.Func.
			async.Do(func() { fmt.Println("defer 1") }),
		),
		async.Func(async.Block( // A block in a function scope.
			async.Defer(
				async.Do(func() { fmt.Println("defer 2") }),
			),
			async.Loop(async.Block(
				async.Await(&myState),
				func(co *async.Coroutine) async.Result {
					if v := myState.Get(); v%2 == 0 {
						return co.Continue()
					}
					return co.End()
				},
				async.Do(func() {
					fmt.Println(myState.Get())
				}),
				func(co *async.Coroutine) async.Result {
					if v := myState.Get(); v >= 7 {
						return co.Return() // Return here.
					}
					return co.End()
				},
			)),
			async.Do(func() { fmt.Println("after Loop") }), // Didn't run due to early return.
		)),
		async.Do(func() { fmt.Println("after Func") }),
	))

	for i := 1; i <= 9; i++ {
		myExecutor.Spawn(async.Do(func() { myState.Set(i) }))
	}

	fmt.Println(myState.Get()) // Prints 9.

}
Output:

1
3
5
7
defer 2
after Func
defer 1
9
Example (Exit)
package main

import (
	"fmt"

	"github.com/b97tsk/async"
)

func main() {
	var myExecutor async.Executor

	myExecutor.Autorun(myExecutor.Run)

	var myState async.State[int]

	myExecutor.Spawn(async.Block(
		async.Defer( // Note that spawned tasks are considered surrounded by an invisible async.Func.
			async.Do(func() { fmt.Println("defer 1") }),
		),
		async.Func(async.Block( // A block in a function scope.
			async.Defer(
				async.Do(func() { fmt.Println("defer 2") }),
			),
			async.Loop(async.Block(
				async.Await(&myState),
				func(co *async.Coroutine) async.Result {
					if v := myState.Get(); v%2 == 0 {
						return co.Continue()
					}
					return co.End()
				},
				async.Do(func() {
					fmt.Println(myState.Get())
				}),
				func(co *async.Coroutine) async.Result {
					if v := myState.Get(); v >= 7 {
						return co.Exit() // Exit here.
					}
					return co.End()
				},
			)),
			async.Do(func() { fmt.Println("after Loop") }), // Didn't run due to early exit.
		)),
		async.Do(func() { fmt.Println("after Func") }), // Didn't run due to early exit.
	))

	for i := 1; i <= 9; i++ {
		myExecutor.Spawn(async.Do(func() { myState.Set(i) }))
	}

	fmt.Println(myState.Get()) // Prints 9.

}
Output:

1
3
5
7
defer 2
defer 1
9
Example (Tailcall)

This example demonstrates how to make tail-calls in an async.Func. Tail-calls are not recommended and should be avoided when possible. Without tail-call optimization, this example shall panic.

package main

import (
	"fmt"

	"github.com/b97tsk/async"
)

func main() {
	var myExecutor async.Executor

	myExecutor.Autorun(myExecutor.Run)

	// Case 1: Making tail-call in the last task of a block.
	myExecutor.Spawn(func(co *async.Coroutine) async.Result {
		var n int

		var t async.Task

		t = async.Func(async.Block(
			async.End(),
			async.End(),
			async.End(),
			func(co *async.Coroutine) async.Result { // Last task in the block.
				if n < 2000000 {
					n++
					return co.Transition(t) // Tail-call here.
				}
				return co.End()
			},
		))

		return co.Transition(t.Then(async.Do(func() { fmt.Println(n) })))
	})

	// Case 2: Making tail-call anywhere.
	myExecutor.Spawn(func(co *async.Coroutine) async.Result {
		var n int

		var t async.Task

		t = async.Func(async.Block(
			func(co *async.Coroutine) async.Result {
				if n < 2000000 {
					n++
					co.Defer(t)        // Tail-call here (using the only defer call as a workaround).
					return co.Return() // Early return.
				}
				return co.End()
			},
			async.End(),
			async.End(),
			async.End(),
		))

		return co.Transition(t.Then(async.Do(func() { fmt.Println(n) })))
	})

}
Output:

2000000
2000000

func Join added in v0.5.0

func Join(s ...Task) Task

Join returns a Task that runs each of the given tasks in its own child coroutine and awaits until all of them complete, and then ends.

When passed no arguments, Join returns a Task that never ends.

Example
package main

import (
	"fmt"
	"sync"
	"time"

	"github.com/b97tsk/async"
)

func main() {
	var wg sync.WaitGroup // For keeping track of goroutines.

	var myExecutor async.Executor

	myExecutor.Autorun(func() { wg.Go(myExecutor.Run) })

	var s1, s2 async.State[int]

	myExecutor.Spawn(async.Block(
		async.Join(
			func(co *async.Coroutine) async.Result {
				wg.Go(func() {
					time.Sleep(500 * time.Millisecond) // Heavy work #1 here.
					ans := 15
					myExecutor.Spawn(async.Do(func() { s1.Set(ans) }))
				})
				return co.Await(&s1).End() // Awaits until &s1 notifies, then ends.
			},
			func(co *async.Coroutine) async.Result {
				wg.Go(func() {
					time.Sleep(1500 * time.Millisecond) // Heavy work #2 here.
					ans := 27
					myExecutor.Spawn(async.Do(func() { s2.Set(ans) }))
				})
				return co.Await(&s2).End() // Awaits until &s2 notifies, then ends.
			},
		),
		async.Do(func() { fmt.Println("s1 + s2 =", s1.Get()+s2.Get()) }),
	))

	wg.Wait()

}
Output:

s1 + s2 = 42

func Loop added in v0.3.0

func Loop(t Task) Task

Loop returns a Task that forms a loop, which would run t repeatedly. Both Coroutine.Break and Break can break this loop early. Both Coroutine.Continue and Continue can continue this loop early.

Example
package main

import (
	"fmt"

	"github.com/b97tsk/async"
)

func main() {
	var myExecutor async.Executor

	myExecutor.Autorun(myExecutor.Run)

	var myState async.State[int]

	myExecutor.Spawn(async.Loop(async.Block(
		async.Await(&myState),
		func(co *async.Coroutine) async.Result {
			if v := myState.Get(); v%2 == 0 {
				return co.Continue()
			}
			return co.End()
		},
		async.Do(func() {
			fmt.Println(myState.Get())
		}),
		func(co *async.Coroutine) async.Result {
			if v := myState.Get(); v >= 7 {
				return co.Break()
			}
			return co.End()
		},
	)))

	for i := 1; i <= 9; i++ {
		myExecutor.Spawn(async.Do(func() { myState.Set(i) }))
	}

	fmt.Println(myState.Get()) // Prints 9.

}
Output:

1
3
5
7
9

func LoopN added in v0.3.0

func LoopN[Int intType](n Int, t Task) Task

LoopN returns a Task that forms a loop, which would run t repeatedly for n times. Both Coroutine.Break and Break can break this loop early. Both Coroutine.Continue and Continue can continue this loop early.

Example
package main

import (
	"fmt"

	"github.com/b97tsk/async"
)

func main() {
	var myExecutor async.Executor

	myExecutor.Autorun(myExecutor.Run)

	var myState async.State[int]

	myExecutor.Spawn(async.LoopN(7, async.Block(
		async.Await(&myState),
		func(co *async.Coroutine) async.Result {
			if v := myState.Get(); v%2 == 0 {
				return co.Continue()
			}
			return co.End()
		},
		async.Do(func() {
			fmt.Println(myState.Get())
		}),
	)))

	for i := 1; i <= 9; i++ {
		myExecutor.Spawn(async.Do(func() { myState.Set(i) }))
	}

	fmt.Println(myState.Get()) // Prints 9.

}
Output:

1
3
5
7
9

func MergeSeq added in v0.12.0

func MergeSeq(concurrency int, seq iter.Seq[Task]) Task

MergeSeq returns a Task that runs each of the tasks from seq in its own child coroutine concurrently until all of them complete, and then ends. The argument concurrency specifies the maximum number of tasks that can run at the same time. If it is zero, no tasks will be run and MergeSeq never ends. It may wrap around. The maximum value of concurrency is -1.

Caveat: requires spawning a goroutine (which is stackful) when running the returned task. The goroutine leaks, as well as the coroutine that runs the returned task, if the returned task never ends.

Example
package main

import (
	"fmt"
	"sync"
	"time"

	"github.com/b97tsk/async"
)

func main() {
	var wg sync.WaitGroup // For keeping track of goroutines.

	var myExecutor async.Executor

	myExecutor.Autorun(func() { wg.Go(myExecutor.Run) })

	sleep := func(d time.Duration) async.Task {
		return func(co *async.Coroutine) async.Result {
			var sig async.Signal
			wg.Add(1) // Keep track of timers too.
			tm := time.AfterFunc(d, func() {
				defer wg.Done()
				myExecutor.Spawn(async.Do(sig.Notify))
			})
			co.CleanupFunc(func() {
				if tm.Stop() {
					wg.Done()
				}
			})
			return co.Await(&sig).End()
		}
	}

	myExecutor.Spawn(async.MergeSeq(3, func(yield func(async.Task) bool) {
		defer fmt.Println("done")
		for n := 1; n <= 6; n++ {
			d := time.Duration(n*100) * time.Millisecond
			f := func() { fmt.Println(n) }
			t := sleep(d).Then(async.Do(f))
			if !yield(t) {
				return
			}
		}
	}))

	wg.Wait()
	fmt.Println("--- SEPARATOR ---")

	myExecutor.Spawn(async.Select(
		sleep(1000*time.Millisecond), // Cancel the following task after a period of time.
		async.MergeSeq(3, func(yield func(async.Task) bool) {
			defer fmt.Println("done")
			for n := 1; ; n++ { // Infinite loop.
				d := time.Duration(n*100) * time.Millisecond
				f := func() { fmt.Println(n) }
				t := sleep(d).Then(async.Do(f))
				if !yield(t) {
					return
				}
			}
		}),
	))

	wg.Wait()

}
Output:

1
2
3
4
done
5
6
--- SEPARATOR ---
1
2
3
4
5
6
done

func NonCancelable added in v0.16.0

func NonCancelable(t Task) Task

NonCancelable returns a Task that runs t in a non-cancelable context, preventing it from being canceled by a parent coroutine.

Example
package main

import (
	"fmt"

	"github.com/b97tsk/async"
)

func main() {
	var myExecutor async.Executor

	myExecutor.Autorun(myExecutor.Run)

	var sig1, sig2 async.Signal

	{
		fmt.Println("without NonCancelable:")

		myExecutor.Spawn(async.Block(
			async.Select(
				async.Await(&sig1), // When sig1 notifies, cancel the following task.
				async.Block(
					async.Defer(async.Block(
						async.Await(&sig2), // Without NonCancelable, canceled coroutines cannot yield.
						async.Do(func() { fmt.Println("after Await") }),
					)),
					async.Await(), // Awaits for cancellation.
				),
			),
			async.Do(func() { fmt.Println("after Select") }),
		))

		myExecutor.Spawn(async.Do(sig1.Notify))
		myExecutor.Spawn(async.Do(sig2.Notify))
	}

	{
		fmt.Println("with NonCancelable:")

		myExecutor.Spawn(async.Block(
			async.Select(
				async.Await(&sig1), // When sig1 notifies, cancel the following task.
				async.Block(
					async.Defer(async.Block(
						// With NonCancelable, even canceled coroutines can yield, too.
						async.NonCancelable(async.Await(&sig2)),
						async.Do(func() { fmt.Println("after Await") }),
					)),
					async.Await(), // Awaits for cancellation.
				),
			),
			async.Do(func() { fmt.Println("after Select") }),
		))

		myExecutor.Spawn(async.Do(sig1.Notify))
		myExecutor.Spawn(async.Do(sig2.Notify))
	}

	{
		fmt.Println("additional tests:")

		for i := range 5 {
			myExecutor.Spawn(async.Block(
				async.Defer(async.Do(func() { fmt.Println(i) })),
				async.LoopN(1, func(co *async.Coroutine) async.Result {
					co.Spawn(async.NonCancelable(async.Await(&sig1)))
					switch i {
					case 0:
						return co.End()
					case 1:
						return co.Break()
					case 2:
						return co.Continue()
					case 3:
						return co.Return()
					default:
						return co.Exit()
					}
				}),
				async.Do(func() { fmt.Println("after LoopN") }),
			))
			myExecutor.Spawn(async.Do(sig1.Notify))
		}
	}

}
Output:

without NonCancelable:
after Select
with NonCancelable:
after Await
after Select
additional tests:
after LoopN
0
after LoopN
1
after LoopN
2
3
4

func Panic added in v0.17.0

func Panic(v any) Task

Panic returns a Task that causes the coroutine that runs it to behave like there's a panic. Unlike the built-in panic function, Panic leaves no stack trace behind. Please use with caution.

func Return added in v0.3.0

func Return() Task

Return returns a Task that returns from a surrounding Func.

func Select added in v0.5.0

func Select(s ...Task) Task

Select returns a Task that runs each of the given tasks in its own child coroutine and awaits until any of them completes, and then ends. When Select ends, tasks other than the one that completes are canceled.

When passed no arguments, Select returns a Task that never ends.

Example
package main

import (
	"fmt"
	"sync"
	"time"

	"github.com/b97tsk/async"
)

func main() {
	var wg sync.WaitGroup // For keeping track of goroutines.

	var myExecutor async.Executor

	myExecutor.Autorun(func() { wg.Go(myExecutor.Run) })

	var s1, s2 async.State[int]

	myExecutor.Spawn(async.Block(
		async.Select(
			func(co *async.Coroutine) async.Result {
				wg.Go(func() {
					time.Sleep(500 * time.Millisecond) // Heavy work #1 here.
					ans := 15
					myExecutor.Spawn(async.Do(func() { s1.Set(ans) }))
				})
				return co.Await(&s1).End() // Awaits until &s1 notifies, then ends.
			},
			func(co *async.Coroutine) async.Result {
				wg.Go(func() {
					time.Sleep(1500 * time.Millisecond) // Heavy work #2 here.
					ans := 27
					myExecutor.Spawn(async.Do(func() { s2.Set(ans) }))
				})
				return co.Await(&s2).End() // Awaits until &s2 notifies, then ends.
			},
		),
		async.Do(func() { fmt.Println("s1 + s2 =", s1.Get()+s2.Get()) }),
	))

	wg.Wait()

}
Output:

s1 + s2 = 15
Example (WithCancel)

Without cancellation, ExampleSelect takes the same amount of time as ExampleJoin, which is unacceptable. The following example fixes that.

package main

import (
	"context"
	"fmt"
	"sync"
	"time"

	"github.com/b97tsk/async"
)

func main() {
	var wg sync.WaitGroup // For keeping track of goroutines.

	var myExecutor async.Executor

	myExecutor.Autorun(func() { wg.Go(myExecutor.Run) })

	var s1, s2 async.State[int]

	myExecutor.Spawn(async.Block(
		async.Func(
			func(co *async.Coroutine) async.Result {
				ctx, cancel := context.WithCancel(context.Background())
				co.Defer(async.Do(cancel))
				return co.Transition(async.Select(
					func(co *async.Coroutine) async.Result {
						wg.Go(func() {
							select { // Heavy work #1 here.
							case <-time.After(500 * time.Millisecond):
							case <-ctx.Done():
								return // Cancel work when ctx gets canceled.
							}
							ans := 15
							myExecutor.Spawn(async.Do(func() { s1.Set(ans) }))
						})
						return co.Await(&s1).End() // Awaits until &s1 notifies, then ends.
					},
					func(co *async.Coroutine) async.Result {
						wg.Go(func() {
							select { // Heavy work #2 here.
							case <-time.After(1500 * time.Millisecond):
							case <-ctx.Done():
								return // Cancel work when ctx gets canceled.
							}
							ans := 27
							myExecutor.Spawn(async.Do(func() { s2.Set(ans) }))
						})
						return co.Await(&s2).End() // Awaits until &s2 notifies, then ends.
					},
				))
			},
		),
		async.Do(func() { fmt.Println("s1 + s2 =", s1.Get()+s2.Get()) }),
	))

	wg.Wait()

}
Output:

s1 + s2 = 15

func Spawn added in v0.12.0

func Spawn(t Task) Task

Spawn returns a Task that runs t in a child coroutine and awaits until t completes, and then ends.

Spawn(t) is equivalent to Join(t) or Select(t), but cheaper and clearer.

Example
package main

import (
	"fmt"

	"github.com/b97tsk/async"
)

func main() {
	var myExecutor async.Executor

	myExecutor.Autorun(myExecutor.Run)

	// Exit (async.Exit or (*async.Coroutine).Exit) causes the coroutine that runs it to exit.
	// Tasks after Exit do not run.
	myExecutor.Spawn(async.Exit().Then(async.Do(func() { fmt.Println("after Exit") })))

	// With the help of async.Spawn, Exit only affects child coroutines.
	// The parent one continues to run tasks after async.Spawn.
	myExecutor.Spawn(async.Spawn(async.Exit()).Then(async.Do(func() { fmt.Println("after Spawn") })))

}
Output:

after Spawn

func (Task) Then added in v0.2.0

func (t Task) Then(next Task) Task

Then returns a Task that first works on t, then next after t ends.

To chain multiple tasks, use Block function.

Example

This example demonstrates how to run a task after another. To run multiple tasks in sequence, use async.Block instead.

package main

import (
	"fmt"

	"github.com/b97tsk/async"
)

func main() {
	var myExecutor async.Executor

	myExecutor.Autorun(myExecutor.Run)

	var myState async.State[int]

	a := func(co *async.Coroutine) async.Result {
		co.Watch(&myState)

		v := myState.Get()
		fmt.Println(v, "(a)")

		if v < 3 {
			return co.Yield()
		}

		return co.Transition(func(co *async.Coroutine) async.Result {
			co.Watch(&myState)

			v := myState.Get()
			fmt.Println(v, "(transitioned)")

			if v < 5 {
				return co.Yield()
			}

			return co.End()
		})
	}

	b := func(co *async.Coroutine) async.Result {
		co.Watch(&myState)

		v := myState.Get()
		fmt.Println(v, "(b)")

		if v < 7 {
			return co.Yield()
		}

		return co.End()
	}

	myExecutor.Spawn(async.Task(a).Then(b))

	for i := 1; i <= 9; i++ {
		myExecutor.Spawn(async.Do(func() { myState.Set(i) }))
	}

	fmt.Println(myState.Get()) // Prints 9.

}
Output:

0 (a)
1 (a)
2 (a)
3 (a)
3 (transitioned)
4 (transitioned)
5 (transitioned)
5 (b)
6 (b)
7 (b)
9

type WaitGroup added in v0.2.0

type WaitGroup struct {
	Signal
	// contains filtered or unexported fields
}

A WaitGroup is a Signal with a counter.

Calling the Add or Done method of a WaitGroup, in a Task function, updates the counter and, when the counter becomes zero, resumes any coroutine that is watching the WaitGroup.

A WaitGroup must not be shared by more than one Executor.

Example
package main

import (
	"fmt"
	"sync"
	"time"

	"github.com/b97tsk/async"
)

func main() {
	var wg sync.WaitGroup // For keeping track of goroutines.

	var myExecutor async.Executor

	myExecutor.Autorun(func() { wg.Go(myExecutor.Run) })

	var myState struct {
		wg     async.WaitGroup
		v1, v2 int
	}

	myState.wg.Add(2) // Note that async.WaitGroup is not safe for concurrent use.

	wg.Go(func() {
		time.Sleep(500 * time.Millisecond) // Heavy work #1 here.
		ans := 15
		myExecutor.Spawn(async.Do(func() {
			myState.v1 = ans
			myState.wg.Done()
		}))
	})

	wg.Go(func() {
		time.Sleep(500 * time.Millisecond) // Heavy work #2 here.
		ans := 27
		myExecutor.Spawn(async.Do(func() {
			myState.v2 = ans
			myState.wg.Done()
		}))
	})

	myExecutor.Spawn(myState.wg.Await().Then(async.Do(func() {
		fmt.Println("v1 + v2 =", myState.v1+myState.v2)
	})))

	wg.Wait()

}
Output:

v1 + v2 = 42

func (*WaitGroup) Add added in v0.2.0

func (wg *WaitGroup) Add(delta int)

Add adds delta, which may be negative, to the WaitGroup counter. If the WaitGroup counter becomes zero, Add resumes any coroutine that is watching wg. If the WaitGroup counter is negative, Add panics.

func (*WaitGroup) Await added in v0.2.0

func (wg *WaitGroup) Await() Task

Await returns a Task that awaits until the WaitGroup counter becomes zero, and then ends.

func (*WaitGroup) Done added in v0.2.0

func (wg *WaitGroup) Done()

Done decrements the WaitGroup counter by one.

type Weight added in v0.7.0

type Weight int64

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL