Skip to content

Async Programming in Incan

Incan provides full async/await support powered by the Tokio runtime. This guide covers all async features available in Incan.

Async is import-activated

async and await are soft keywords: they become reserved keywords only after importing std.async (for example import std.async or from std.async.time import sleep).

Coming from Python?

Incan's async model mirrors Python's asyncio — you'll find async def, await, task spawning, and timeouts all work the same way.

The key difference is under the hood: Incan compiles to Rust and uses Tokio, giving you the familiar Python syntax with Rust-level async performance.

Quick Start

from std.async.time import sleep

async def fetch_data() -> str:
    await sleep(1.0)  # Wait 1 second
    return "data"

def main() -> None:
    # When async is used, main automatically gets #[tokio::main]
    println("Starting...")

Core Concepts

Cancellation Vocabulary

Async APIs in std.async document cancellation with four contract terms:

Term Meaning
cancel-safe Cancelling a pending wait does not consume the value, acquire the resource, or otherwise complete the operation.
cancel-safe-but-lossy Cancelling the wait does not complete the operation, but a value or queue position owned by that wait may be lost.
not cancel-safe Cancelling a pending wait can break the operation's coordination contract or leave other participants waiting.
durable once spawned Work continues after it is spawned unless it finishes or is explicitly aborted; dropping the handle detaches the work and loses the result.

Async Functions

Declare async functions with async def:

from std.async.time import sleep

async def do_work() -> int:
    await sleep(0.5)
    return 42

Await

Use await to wait for an async operation:

import std.async

async def process() -> str:
    data = await fetch_data()
    result = await transform(data)
    return result

await is only valid inside async def bodies and async methods Using await in an ordinary def or sync method is a type error.

Coming from Python?

Incan's await follows the same rules as in Python: it is disallowed outside an async scope.

Time Primitives

sleep

Pause execution for a duration:

from std.async.time import sleep, sleep_ms

async def demo() -> None:
    await sleep(1.5)      # Sleep 1.5 seconds
    await sleep_ms(500)   # Sleep 500 milliseconds

timeout

Run an operation with a time limit:

from std.async.time import timeout

async def demo() -> None:
    result = await timeout(5.0, slow_operation)
    match result:
        case Ok(value): println(f"Success: {value}")
        case Err(e): println("Operation timed out")

timeout() and timeout_ms() cancel the supplied future when the deadline expires. That is appropriate for ordinary request work where the timed-out operation should stop. If the work must keep running after the deadline path returns, spawn it first and use timeout_join() so the timeout result preserves the live JoinHandle.

timeout_join

Wait for spawned work without cancelling it when the deadline expires:

from std.async.task import spawn
from std.async.time import timeout_join, TimeoutJoinOutcome

handle = spawn(write_audit_event(event))

match await timeout_join(1.0, handle):
    case TimeoutJoinOutcome.Completed(_): println("audit event written")
    case TimeoutJoinOutcome.JoinFailed(err): println(err.message())
    case TimeoutJoinOutcome.TimedOut(live_handle):
        remember(live_handle)

Use timeout_join() for side-effecting work that must keep running once spawned, such as durable writes, protocol commits, and file flushes. On timeout, the task continues running and TimeoutJoinOutcome.TimedOut(handle) carries the live handle so you can await it later, store it in a task registry, or abort it deliberately. If an outer cancellation boundary cancels the timeout_join() call itself before it returns, the helper-owned handle is dropped and the task is detached unless you arranged another completion path. For spawn_blocking() handles, abort() can only prevent queued work from starting; blocking work that has already started must finish on its own.

Task Spawning

spawn

Run a task concurrently:

from std.async.task import spawn
from std.async.time import sleep

async def background_work() -> int:
    await sleep(2.0)
    return 42

# Spawn returns immediately
handle = spawn(background_work)

# Do other work...
println("Working on other things...")

# Wait for the spawned task
result = await handle
println(f"Background task returned: {result}")

Spawned tasks are durable once spawned. Dropping handle detaches the task and loses the result; it does not cancel the task. Use handle.abort() when an async task should be cancelled.

spawn_blocking

Run CPU-intensive or blocking code on a dedicated thread:

from std.async.task import spawn_blocking

def heavy_computation() -> int:
    result = 0
    for i in range(1_000_000):
        result = result + i
    return result

# Won't block the async runtime
result = await spawn_blocking(heavy_computation)

spawn_blocking() is for bounded blocking work that eventually finishes on its own. Dropping its JoinHandle detaches the work and loses the result, and abort() cannot stop blocking work after it starts running. abort() can only prevent a queued blocking task from starting.

yield_now

Cooperatively yield to let other tasks run:

from std.async.task import yield_now

async def cooperative_loop() -> None:
    for i in range(10000):
        # Do some work...
        if i % 100 == 0:
            await yield_now()  # Let other tasks run

Channels

Channels enable safe message passing between concurrent tasks. They're the primary way to communicate between async tasks without shared mutable state.

MPSC Channel (Multi-Producer, Single-Consumer)

MPSC stands for Multi-Producer, Single-Consumer:

  • Multiple senders can send messages (clone the sender to share it)
  • One receiver processes all messages
  • Messages arrive in order (FIFO)

When to use MPSC

Use Case Why MPSC
Worker pool → result collector Many workers, one aggregator
Event bus Multiple event sources, one handler
Logging Multiple tasks → single log writer
Request queue Multiple clients → one server

Bounded channel (recommended):

from std.async.channel import channel

# Create channel with buffer size 32
tx, rx = channel[str](32)

# Sender - blocks if buffer is full (backpressure)
async def producer() -> None:
    await tx.send("hello")  # Waits if buffer full
    await tx.send("world")

# Receiver - blocks until message available
async def consumer() -> None:
    while let Some(msg) = await rx.recv():
        println(f"Got: {msg}")

Cancellation contracts:

  • tx.send(value) is cancel-safe-but-lossy. If it is cancelled while waiting for bounded-channel capacity, the value is not sent and is dropped.
  • tx.reserve() waits for capacity before a value is committed. Use it for critical sends where cancellation must not drop the message value.
  • permit.send(value) is synchronous and either delivers the value or returns it in SendError[T].
  • rx.recv() is cancel-safe. If it is cancelled while waiting, no message is removed from the channel.
  • try_send() and try_recv() return immediately, so there is no pending wait to cancel.

Reserve capacity before building or moving a critical message:

match await tx.reserve():
    Ok(permit) =>
        match permit.send("audit-event"):
            Ok(_) => println("sent")
            Err(err) => println(f"send failed: {err.value}")
    Err(err) => println(err.message())

Multiple producers (clone the sender):

from std.async.channel import channel
from std.async.task import spawn

tx, rx = channel[int](100)

# Clone sender for each producer
tx1 = tx.clone()
tx2 = tx.clone()

spawn(async () -> None:
    await tx1.send(1)
    await tx1.send(2)
)

spawn(async () -> None:
    await tx2.send(100)
    await tx2.send(200)
)

# Single consumer receives from all producers
async def consume() -> None:
    while let Some(n) = await rx.recv():
        println(f"Received: {n}")

Coming from Python?

Incan channels are inspired by Rust's tokio::sync::mpsc, not Python's asyncio.Queue. Key differences:

  • Sender/Receiver split: You get a (tx, rx) pair — clone tx for multiple producers
  • Ownership semantics: When all senders are dropped, the channel closes and recv() returns None
  • No shared queue: Unlike asyncio.Queue, you can't just pass the queue around — you pass senders or the receiver

The Python equivalent would use asyncio.Queue with put()/get(), but lacks the automatic "channel closed" signal when producers finish.

Bounded vs Unbounded:

Type Backpressure Memory Safety Use When
channel[T](n) Yes - send blocks when full Bounded memory Production code
unbounded_channel[T]() No - send never blocks Can grow forever Prototyping, low-volume

Unbounded Channel

send never blocks, but use with caution — if producers outpace consumers, memory grows without limit:

from std.async.channel import unbounded_channel

# No capacity limit - send always succeeds immediately
tx, rx = unbounded_channel[int]()

# These never block
tx.send(1)
tx.send(2)
tx.send(3)

# Consumer still blocks waiting for messages
match await rx.recv():
    case Some(n): println(f"Got: {n}")
    case None: println("Channel closed")

When unbounded is OK:

  • Low message volume
  • Consumers always faster than producers
  • Prototyping (switch to bounded before production)

One-Shot Channel

A oneshot channel sends exactly one value. After sending, the sender is consumed and can't be used again. This is perfect for "request-response" patterns where you spawn a task and wait for its result.

When to use oneshot vs regular channel:

Use Case Channel Type
Stream of messages channel[T]
Single result from spawned task oneshot[T]
Producer-consumer queue channel[T]
Async function return value oneshot[T]

Basic usage:

from std.async.channel import oneshot
from std.async.task import spawn

tx, rx = oneshot[int]()

spawn(async () -> None:
    result = expensive_computation()
    tx.send(result)  # Consumes sender - can only send once
)

# Wait for the result
match await rx.recv():
    case Ok(value): println(f"Got result: {value}")
    case Err(e): println("Sender dropped without sending")

The one-shot receiver is cancel-safe: cancelling rx.recv() does not consume the value. tx.send(value) is synchronous, so it either delivers the value or returns it immediately if the receiver is gone.

Common pattern - returning results from spawned tasks:

from std.async.channel import oneshot
from std.async.task import spawn

async def compute_in_background(input: Data) -> Result[Output, ComputeError]:
    # Create oneshot for the result
    tx, rx = oneshot[Result[Output, ComputeError]]()

    # Spawn the computation
    spawn(async () -> None:
        result = heavy_computation(input)
        tx.send(result)
    )

    # Do other work while computation runs...
    do_other_stuff()

    # Wait for result
    match await rx.recv():
        case Ok(result): return result
        case Err(_): return Err(ComputeError.TaskFailed)

Key differences from regular channels:

  • send() consumes the sender (can only call once)
  • No buffering - it's either empty or has exactly one value
  • Lighter weight than MPSC channels
  • recv() returns Err if sender was dropped without sending

Synchronization Primitives

Incan provides low-level synchronization primitives inspired by Rust's tokio runtime. These give you fine-grained control over shared state and task coordination.

Coming from Python?

Python's asyncio has some equivalents, but Incan's primitives work differently.

Incan Python asyncio Key Difference
Mutex[T] asyncio.Lock Incan wraps the value; Python protects external data
RwLock[T] (no equivalent) Allows multiple readers OR single writer
Semaphore asyncio.Semaphore Similar behavior
Barrier asyncio.Barrier (3.11+) Similar behavior

The key difference: Incan's Mutex[T] and RwLock[T] wrap a value (like Rust), while Python's Lock is just a lock you use alongside your data.

Mutex

Mutual exclusion — ensures only one task can access the wrapped value at a time.

When to use: Multiple tasks need to both read AND write shared state.

API:

  • Mutex.new(value) — Create a mutex wrapping a value
  • await mutex.lock() — Acquire lock, returns a guard
  • guard.get() — Read the value
  • guard.set(new_value) — Write a new value
  • Guard auto-releases when it goes out of scope
from std.async.sync import Mutex

shared_counter = Mutex.new(0)

async def increment() -> None:
    guard = await shared_counter.lock()  # Blocks until lock acquired
    current = guard.get()
    guard.set(current + 1)
    # Lock released when guard goes out of scope

mutex.lock() is cancel-safe-but-lossy: cancellation before the guard is returned does not acquire the lock, but the waiter loses its place in the fairness queue.

Python comparison

Python uses a lock separate from the data. Incan wraps the data (Mutex[T]), so the lock and value are kept together.

# Python - lock is separate from data
lock = asyncio.Lock()
counter = 0

async def increment():
    async with lock:
        counter += 1
from std.async.sync import Mutex

# Incan - lock wraps the data
counter = Mutex.new(0)

async def increment() -> None:
    guard = await counter.lock()
    guard.set(guard.get() + 1)

RwLock

Reader-writer lock — allows multiple readers OR a single writer, but not both simultaneously. More efficient than Mutex when reads are frequent.

When to use: Many tasks read, few tasks write (e.g., configuration, caches).

API:

  • RwLock.new(value) — Create an RwLock wrapping a value
  • await rwlock.read() — Acquire read lock (shared with other readers)
  • await rwlock.write() — Acquire write lock (exclusive)
  • guard.get() — Read the value
  • guard.set(new_value) — Write (only on write guard)
from std.async.sync import RwLock

config = RwLock.new(Config(debug=False))

async def read_config() -> bool:
    guard = await config.read()  # Multiple readers allowed simultaneously
    return guard.get().debug

async def update_config(debug: bool) -> None:
    guard = await config.write()  # Waits for all readers, then exclusive
    guard.set(Config(debug=debug))

rwlock.read() and rwlock.write() are cancel-safe-but-lossy: cancellation before a guard is returned does not acquire the lock, but the waiter loses its place in the fairness queue.

Coming from Python?

Python's asyncio doesn't have RwLock. This is a Rust/systems programming concept.

If you need read-heavy access patterns in Python, you typically just use a Lock for everything.

Semaphore

Counting semaphore — limits how many tasks can access a resource concurrently by managing a pool of "permits."

When to use: Connection pools, rate limiting, bounding concurrent operations.

API:

  • Semaphore.new(n) — Create with n permits
  • await sem.acquire() — Wait for and acquire a permit
  • Permit auto-releases when it goes out of scope
from std.async.sync import Semaphore

# Allow max 3 concurrent connections
connection_limit = Semaphore.new(3)

async def make_request() -> Response:
    permit = await connection_limit.acquire()  # Blocks if no permits
    response = await http_get(url)
    # Permit released automatically when permit goes out of scope
    return response

sem.acquire() is cancel-safe-but-lossy: cancellation before a permit is returned does not acquire a permit, but the waiter loses its place in the fairness queue.

Python equivalent

asyncio.Semaphore(n) works the same way.

Barrier

Synchronization point — makes N tasks wait until all of them reach the barrier before any can proceed.

When to use: Phased algorithms, coordinating startup, map-reduce patterns.

API:

  • Barrier.new(n) — Create barrier for n tasks
  • await barrier.wait() — Wait until all n tasks reach this point
from std.async.sync import Barrier

barrier = Barrier.new(3)  # Wait for 3 tasks

async def worker(id: int) -> None:
    println(f"Worker {id} starting phase 1")
    # ... do phase 1 work ...

    await barrier.wait()  # All 3 must reach here before anyone continues

    println(f"Worker {id} starting phase 2")  # All start phase 2 together

barrier.wait() is cancellation-aware before release: cancelling a pending wait withdraws that participant from the current generation and frees its slot. Remaining participants still need enough active arrivals to complete the generation, so workflows that allow independent participant cancellation should also define how replacement participants arrive or how the whole phase is abandoned. The returned slot is unique within a completed generation, but cancellation can reuse freed slots, so do not treat it as chronological arrival order.

Python equivalent

asyncio.Barrier(n) (added in Python 3.11) works the same way.

Complete Example

See examples/advanced/async_sync.incn for a runnable demo of all four primitives.

Select and Future Race

select_timeout

Simplified timeout returning Option:

from std.async.select import select_timeout

match await select_timeout(2.0, slow_operation):
    case Some(result): println(f"Got: {result}")
    case None: println("Timed out, using default")

select_timeout() has the same cancellation contract as timeout(): if the deadline wins, the supplied future is cancelled. Use it only when the timed-out future may be safely abandoned. For spawned work that must continue, use timeout_join() instead.

Future race syntax is planned as first-completion-wins composition. Losing race arms are cancelled, so loser arms must not contain side effects that are required for correctness after their final suspension point. Put required cleanup in cancellation-safe resources, or spawn durable work before the race and use a handle-preserving wait such as timeout_join() when you still need the result.

Runtime Integration

Automatic Tokio Main

When your program uses async features, the main() function is automatically wrapped with #[tokio::main]:

Tokio is Rust's async runtime: see the Tokio project for a high-level overview.

import std.async

def main() -> None:
    # This becomes async main() under the hood
    # when async primitives are used
    result = await fetch_data()
    println(f"Result: {result}")

Generated Dependencies

The compiler automatically adds tokio to your Cargo.toml:

[dependencies]
tokio = { version = "1", features = ["rt-multi-thread", "macros", "time", "sync"] }

Best Practices

1. Don't Block the Runtime

Avoid blocking operations in async code:

from std.async.time import sleep

# BAD: Blocks the async runtime
async def bad() -> None:
    std_thread_sleep(1.0)  # Don't do this!

# GOOD: Use async sleep
async def good() -> None:
    await sleep(1.0)

2. Use spawn_blocking for CPU Work

from std.async.task import spawn_blocking

# BAD: CPU work on async runtime
async def bad() -> int:
    return heavy_computation()

# GOOD: Offload to blocking pool
async def good() -> int:
    return await spawn_blocking(heavy_computation)

3. Use Bounded Channels

Prefer bounded channels to prevent memory issues:

from std.async.channel import channel, unbounded_channel

# Prefer this:
tx, rx = channel[Data](100)

# Over this (unbounded can grow forever):
tx, rx = unbounded_channel[Data]()

4. Handle Cancellation Explicitly

Dropping a task handle detaches the task and loses its result. It does not cancel the task:

from std.async.task import spawn
from std.async.time import sleep

async def cancellable_work() -> str:
    await sleep(10.0)
    return "done"

handle = spawn(cancellable_work)
# This detaches the task and loses the result; the task keeps running.
_ = handle

Use handle.abort() when an async task should be cancelled. For blocking work created with spawn_blocking(), abort() only has a chance to prevent execution before the blocking task starts.

Error Handling

Timeout Errors

from std.async.time import timeout

result = await timeout(1.0, slow_task)
match result:
    case Ok(value): process(value)
    case Err(e): println(f"Timed out: {e}")

Channel Closed

When sending fails because the receiver was dropped, SendError[T] contains the value that couldn't be sent in its .value field:

from std.async.channel import Sender

async def sender(tx: Sender[Message]) -> None:
    msg = Message(id=1, content="important data")

    match await tx.send(msg):
        case Ok(_): println("Sent successfully")
        case Err(e):
            # Recover the unsent value - it's not lost!
            println(f"Channel closed, saving for retry: {e.value:?}")
            save_for_later(e.value)

This pattern is important for reliability. If a channel closes unexpectedly, you can recover and handle the data that failed to send.

See Also